void ConnectionImpl::initImplFromLoop()

in tensorpipe/transport/ibv/connection_impl.cc [71:159]


void ConnectionImpl::initImplFromLoop() {
  context_->enroll(*this);

  Error error;
  // The connection either got a socket or an address, but not both.
  TP_DCHECK(socket_.hasValue() ^ sockaddr_.has_value());
  if (!socket_.hasValue()) {
    std::tie(error, socket_) =
        Socket::createForFamily(sockaddr_->addr()->sa_family);
    if (error) {
      setError(std::move(error));
      return;
    }
    error = socket_.reuseAddr(true);
    if (error) {
      setError(std::move(error));
      return;
    }
    error = socket_.connect(sockaddr_.value());
    if (error) {
      setError(std::move(error));
      return;
    }
  }
  // Ensure underlying control socket is non-blocking such that it
  // works well with event driven I/O.
  error = socket_.block(false);
  if (error) {
    setError(std::move(error));
    return;
  }

  // Create ringbuffer for inbox.
  std::tie(error, inboxBuf_) = MmappedPtr::create(
      kBufferSize, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1);
  TP_THROW_ASSERT_IF(error)
      << "Couldn't allocate ringbuffer for connection inbox: " << error.what();
  inboxRb_ =
      RingBuffer<kNumInboxRingbufferRoles>(&inboxHeader_, inboxBuf_.ptr());
  inboxMr_ = createIbvMemoryRegion(
      context_->getReactor().getIbvLib(),
      context_->getReactor().getIbvPd(),
      inboxBuf_.ptr(),
      kBufferSize,
      IbvLib::ACCESS_LOCAL_WRITE | IbvLib::ACCESS_REMOTE_WRITE);

  // Create ringbuffer for outbox.
  std::tie(error, outboxBuf_) = MmappedPtr::create(
      kBufferSize, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1);
  TP_THROW_ASSERT_IF(error)
      << "Couldn't allocate ringbuffer for connection outbox: " << error.what();
  outboxRb_ =
      RingBuffer<kNumOutboxRingbufferRoles>(&outboxHeader_, outboxBuf_.ptr());
  outboxMr_ = createIbvMemoryRegion(
      context_->getReactor().getIbvLib(),
      context_->getReactor().getIbvPd(),
      outboxBuf_.ptr(),
      kBufferSize,
      0);

  // Create and init queue pair.
  {
    IbvLib::qp_init_attr initAttr;
    std::memset(&initAttr, 0, sizeof(initAttr));
    initAttr.qp_type = IbvLib::QPT_RC;
    initAttr.send_cq = context_->getReactor().getIbvCq().get();
    initAttr.recv_cq = context_->getReactor().getIbvCq().get();
    initAttr.cap.max_send_wr = kSendQueueSize;
    initAttr.cap.max_send_sge = 1;
    initAttr.srq = context_->getReactor().getIbvSrq().get();
    initAttr.sq_sig_all = 1;
    qp_ = createIbvQueuePair(
        context_->getReactor().getIbvLib(),
        context_->getReactor().getIbvPd(),
        initAttr);
  }
  transitionIbvQueuePairToInit(
      context_->getReactor().getIbvLib(),
      qp_,
      context_->getReactor().getIbvAddress());

  // Register methods to be called when our peer writes to our inbox and reads
  // from our outbox.
  context_->getReactor().registerQp(qp_->qp_num, shared_from_this());

  // We're sending address first, so wait for writability.
  state_ = SEND_ADDR;
  context_->registerDescriptor(socket_.fd(), EPOLLOUT, shared_from_this());
}