void ChannelImpl::initImplFromLoop()

in tensorpipe/channel/mpt/channel_impl.cc [41:98]


void ChannelImpl::initImplFromLoop() {
  context_->enroll(*this);

  TP_DCHECK_EQ(state_, UNINITIALIZED);
  if (endpoint_ == Endpoint::kConnect) {
    state_ = CLIENT_READING_HELLO;
    auto nopHolderIn = std::make_shared<NopHolder<Packet>>();
    TP_VLOG(6) << "Channel " << id_ << " reading nop object (server hello)";
    connection_->read(
        *nopHolderIn, callbackWrapper_([nopHolderIn](ChannelImpl& impl) {
          TP_VLOG(6) << "Channel " << impl.id_
                     << " done reading nop object (server hello)";
          if (!impl.error_) {
            impl.onClientReadHelloOnConnection(nopHolderIn->getObject());
          }
        }));
  } else if (endpoint_ == Endpoint::kListen) {
    state_ = SERVER_ACCEPTING_LANES;
    const std::vector<std::string>& addresses = context_->addresses();
    TP_DCHECK_EQ(addresses.size(), numLanes_);
    auto nopHolderOut = std::make_shared<NopHolder<Packet>>();
    Packet& nopPacket = nopHolderOut->getObject();
    nopPacket.Become(nopPacket.index_of<ServerHello>());
    ServerHello& nopServerHello = *nopPacket.get<ServerHello>();
    for (uint64_t laneIdx = 0; laneIdx < numLanes_; ++laneIdx) {
      nopServerHello.laneAdvertisements.emplace_back();
      LaneAdvertisement& nopLaneAdvertisement =
          nopServerHello.laneAdvertisements.back();
      nopLaneAdvertisement.address = addresses[laneIdx];
      TP_VLOG(6) << "Channel " << id_ << " requesting connection (for lane "
                 << laneIdx << ")";
      uint64_t token = context_->registerConnectionRequest(
          laneIdx,
          callbackWrapper_(
              [laneIdx](
                  ChannelImpl& impl,
                  std::shared_ptr<transport::Connection> connection) {
                TP_VLOG(6) << "Channel " << impl.id_
                           << " done requesting connection (for lane "
                           << laneIdx << ")";
                if (!impl.error_) {
                  impl.onServerAcceptOfLane(laneIdx, std::move(connection));
                }
              }));
      laneRegistrationIds_.emplace(laneIdx, token);
      nopLaneAdvertisement.registrationId = token;
      numLanesBeingAccepted_++;
    }
    TP_VLOG(6) << "Channel " << id_ << " writing nop object (server hello)";
    connection_->write(
        *nopHolderOut, callbackWrapper_([nopHolderOut](ChannelImpl& impl) {
          TP_VLOG(6) << "Channel " << impl.id_
                     << " done writing nop object (server hello)";
        }));
  } else {
    TP_THROW_ASSERT() << "unknown endpoint";
  }
}