in tensorpipe/core/pipe_impl.cc [265:342]
void PipeImpl::initFromLoop() {
TP_DCHECK(context_->inLoop());
if (context_->closed()) {
// Set the error without calling setError because we do not want to invoke
// handleError as it would find itself in a weird state (since the rest of
// initFromLoop wouldn't have been called).
error_ = TP_CREATE_ERROR(PipeClosedError);
TP_VLOG(1) << "Pipe " << id_ << " is closing (without initing)";
return;
}
context_->enroll(*this);
if (state_ == CLIENT_ABOUT_TO_SEND_HELLO_AND_BROCHURE) {
auto nopHolderOut = std::make_shared<NopHolder<Packet>>();
Packet& nopPacketOut = nopHolderOut->getObject();
nopPacketOut.Become(nopPacketOut.index_of<SpontaneousConnection>());
SpontaneousConnection& nopSpontaneousConnection =
*nopPacketOut.get<SpontaneousConnection>();
nopSpontaneousConnection.contextName = context_->getName();
TP_VLOG(3) << "Pipe " << id_
<< " is writing nop object (spontaneous connection)";
descriptorConnection_->write(
*nopHolderOut, callbackWrapper_([nopHolderOut](PipeImpl& impl) {
TP_VLOG(3) << "Pipe " << impl.id_
<< " done writing nop object (spontaneous connection)";
}));
auto nopHolderOut2 = std::make_shared<NopHolder<Brochure>>();
Brochure& nopBrochure = nopHolderOut2->getObject();
for (const auto& transportContextIter : context_->getOrderedTransports()) {
const std::string& transportName =
std::get<0>(transportContextIter.second);
const transport::Context& transportContext =
*(std::get<1>(transportContextIter.second));
nopBrochure.transportDomainDescriptors[transportName] =
transportContext.domainDescriptor();
}
for (const auto& channelContextIter : context_->getOrderedChannels()) {
const std::string& channelName = std::get<0>(channelContextIter.second);
const channel::Context& channelContext =
*(std::get<1>(channelContextIter.second));
nopBrochure.channelDeviceDescriptors[channelName] =
channelContext.deviceDescriptors();
}
TP_VLOG(3) << "Pipe " << id_ << " is writing nop object (brochure)";
descriptorConnection_->write(
*nopHolderOut2, callbackWrapper_([nopHolderOut2](PipeImpl& impl) {
TP_VLOG(3) << "Pipe " << impl.id_
<< " done writing nop object (brochure)";
}));
state_ = CLIENT_WAITING_FOR_BROCHURE_ANSWER;
auto nopHolderIn = std::make_shared<NopHolder<BrochureAnswer>>();
TP_VLOG(3) << "Pipe " << id_ << " is reading nop object (brochure answer)";
descriptorConnection_->read(
*nopHolderIn, callbackWrapper_([nopHolderIn](PipeImpl& impl) {
TP_VLOG(3) << "Pipe " << impl.id_
<< " done reading nop object (brochure answer)";
if (!impl.error_) {
impl.onReadWhileClientWaitingForBrochureAnswer(
nopHolderIn->getObject());
}
}));
}
if (state_ == SERVER_WAITING_FOR_BROCHURE) {
auto nopHolderIn = std::make_shared<NopHolder<Brochure>>();
TP_VLOG(3) << "Pipe " << id_ << " is reading nop object (brochure)";
descriptorConnection_->read(
*nopHolderIn, callbackWrapper_([nopHolderIn](PipeImpl& impl) {
TP_VLOG(3) << "Pipe " << impl.id_
<< " done reading nop object (brochure)";
if (!impl.error_) {
impl.onReadWhileServerWaitingForBrochure(nopHolderIn->getObject());
}
}));
}
}