in tensorpipe/channel/mpt/channel_impl.cc [41:98]
void ChannelImpl::initImplFromLoop() {
context_->enroll(*this);
TP_DCHECK_EQ(state_, UNINITIALIZED);
if (endpoint_ == Endpoint::kConnect) {
state_ = CLIENT_READING_HELLO;
auto nopHolderIn = std::make_shared<NopHolder<Packet>>();
TP_VLOG(6) << "Channel " << id_ << " reading nop object (server hello)";
connection_->read(
*nopHolderIn, callbackWrapper_([nopHolderIn](ChannelImpl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done reading nop object (server hello)";
if (!impl.error_) {
impl.onClientReadHelloOnConnection(nopHolderIn->getObject());
}
}));
} else if (endpoint_ == Endpoint::kListen) {
state_ = SERVER_ACCEPTING_LANES;
const std::vector<std::string>& addresses = context_->addresses();
TP_DCHECK_EQ(addresses.size(), numLanes_);
auto nopHolderOut = std::make_shared<NopHolder<Packet>>();
Packet& nopPacket = nopHolderOut->getObject();
nopPacket.Become(nopPacket.index_of<ServerHello>());
ServerHello& nopServerHello = *nopPacket.get<ServerHello>();
for (uint64_t laneIdx = 0; laneIdx < numLanes_; ++laneIdx) {
nopServerHello.laneAdvertisements.emplace_back();
LaneAdvertisement& nopLaneAdvertisement =
nopServerHello.laneAdvertisements.back();
nopLaneAdvertisement.address = addresses[laneIdx];
TP_VLOG(6) << "Channel " << id_ << " requesting connection (for lane "
<< laneIdx << ")";
uint64_t token = context_->registerConnectionRequest(
laneIdx,
callbackWrapper_(
[laneIdx](
ChannelImpl& impl,
std::shared_ptr<transport::Connection> connection) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done requesting connection (for lane "
<< laneIdx << ")";
if (!impl.error_) {
impl.onServerAcceptOfLane(laneIdx, std::move(connection));
}
}));
laneRegistrationIds_.emplace(laneIdx, token);
nopLaneAdvertisement.registrationId = token;
numLanesBeingAccepted_++;
}
TP_VLOG(6) << "Channel " << id_ << " writing nop object (server hello)";
connection_->write(
*nopHolderOut, callbackWrapper_([nopHolderOut](ChannelImpl& impl) {
TP_VLOG(6) << "Channel " << impl.id_
<< " done writing nop object (server hello)";
}));
} else {
TP_THROW_ASSERT() << "unknown endpoint";
}
}