void PipeImpl::initFromLoop()

in tensorpipe/core/pipe_impl.cc [265:342]


void PipeImpl::initFromLoop() {
  TP_DCHECK(context_->inLoop());

  if (context_->closed()) {
    // Set the error without calling setError because we do not want to invoke
    // handleError as it would find itself in a weird state (since the rest of
    // initFromLoop wouldn't have been called).
    error_ = TP_CREATE_ERROR(PipeClosedError);
    TP_VLOG(1) << "Pipe " << id_ << " is closing (without initing)";
    return;
  }

  context_->enroll(*this);

  if (state_ == CLIENT_ABOUT_TO_SEND_HELLO_AND_BROCHURE) {
    auto nopHolderOut = std::make_shared<NopHolder<Packet>>();
    Packet& nopPacketOut = nopHolderOut->getObject();
    nopPacketOut.Become(nopPacketOut.index_of<SpontaneousConnection>());
    SpontaneousConnection& nopSpontaneousConnection =
        *nopPacketOut.get<SpontaneousConnection>();
    nopSpontaneousConnection.contextName = context_->getName();
    TP_VLOG(3) << "Pipe " << id_
               << " is writing nop object (spontaneous connection)";
    descriptorConnection_->write(
        *nopHolderOut, callbackWrapper_([nopHolderOut](PipeImpl& impl) {
          TP_VLOG(3) << "Pipe " << impl.id_
                     << " done writing nop object (spontaneous connection)";
        }));

    auto nopHolderOut2 = std::make_shared<NopHolder<Brochure>>();
    Brochure& nopBrochure = nopHolderOut2->getObject();
    for (const auto& transportContextIter : context_->getOrderedTransports()) {
      const std::string& transportName =
          std::get<0>(transportContextIter.second);
      const transport::Context& transportContext =
          *(std::get<1>(transportContextIter.second));
      nopBrochure.transportDomainDescriptors[transportName] =
          transportContext.domainDescriptor();
    }
    for (const auto& channelContextIter : context_->getOrderedChannels()) {
      const std::string& channelName = std::get<0>(channelContextIter.second);
      const channel::Context& channelContext =
          *(std::get<1>(channelContextIter.second));
      nopBrochure.channelDeviceDescriptors[channelName] =
          channelContext.deviceDescriptors();
    }
    TP_VLOG(3) << "Pipe " << id_ << " is writing nop object (brochure)";
    descriptorConnection_->write(
        *nopHolderOut2, callbackWrapper_([nopHolderOut2](PipeImpl& impl) {
          TP_VLOG(3) << "Pipe " << impl.id_
                     << " done writing nop object (brochure)";
        }));
    state_ = CLIENT_WAITING_FOR_BROCHURE_ANSWER;
    auto nopHolderIn = std::make_shared<NopHolder<BrochureAnswer>>();
    TP_VLOG(3) << "Pipe " << id_ << " is reading nop object (brochure answer)";
    descriptorConnection_->read(
        *nopHolderIn, callbackWrapper_([nopHolderIn](PipeImpl& impl) {
          TP_VLOG(3) << "Pipe " << impl.id_
                     << " done reading nop object (brochure answer)";
          if (!impl.error_) {
            impl.onReadWhileClientWaitingForBrochureAnswer(
                nopHolderIn->getObject());
          }
        }));
  }
  if (state_ == SERVER_WAITING_FOR_BROCHURE) {
    auto nopHolderIn = std::make_shared<NopHolder<Brochure>>();
    TP_VLOG(3) << "Pipe " << id_ << " is reading nop object (brochure)";
    descriptorConnection_->read(
        *nopHolderIn, callbackWrapper_([nopHolderIn](PipeImpl& impl) {
          TP_VLOG(3) << "Pipe " << impl.id_
                     << " done reading nop object (brochure)";
          if (!impl.error_) {
            impl.onReadWhileServerWaitingForBrochure(nopHolderIn->getObject());
          }
        }));
  }
}