void AcceptRoutingHandler::onRoutingData()

in wangle/bootstrap/AcceptRoutingHandler-inl.h [75:129]


void AcceptRoutingHandler<Pipeline, R>::onRoutingData(
    uint64_t connId,
    typename RoutingDataHandler<R>::RoutingData& routingData) {
  // Get the routing pipeline corresponding to this connection
  auto routingPipelineIter = routingPipelines_.find(connId);
  if (routingPipelineIter == routingPipelines_.end()) {
    VLOG(2) << "Connection has already been closed, "
               "or routed to a worker thread.";
    return;
  }
  auto routingPipeline = std::move(routingPipelineIter->second);
  routingPipelines_.erase(routingPipelineIter);

  // Fetch the socket from the pipeline and pause reading from the
  // socket
  auto socket = std::dynamic_pointer_cast<folly::AsyncTransport>(
      routingPipeline->getTransport());
  CHECK(socket);
  routingPipeline->transportInactive();
  auto originalEvb = socket->getEventBase();
  socket->detachEventBase();

  // Hash based on routing data to pick a new acceptor
  uint64_t hash = std::hash<R>()(routingData.routingData);
  auto acceptor = acceptors_[hash % acceptors_.size()];

  originalEvb->runInLoop(
      [=, routingData = std::move(routingData)]() mutable {
        // Switch to the new acceptor's thread
        acceptor->getEventBase()->runInEventBaseThread(
            [=, routingData = std::move(routingData)]() mutable {
              socket->attachEventBase(acceptor->getEventBase());

              auto routingHandler =
                  routingPipeline->template getHandler<RoutingDataHandler<R>>();
              DCHECK(routingHandler);
              auto transportInfo = routingPipeline->getTransportInfo();
              auto pipeline = childPipelineFactory_->newPipeline(
                  socket,
                  routingData.routingData,
                  routingHandler,
                  transportInfo);

              auto connection = new
                  typename ServerAcceptor<Pipeline>::ServerConnection(pipeline);
              acceptor->addConnection(connection);

              pipeline->transportActive();

              // Pass in the buffered bytes to the pipeline
              pipeline->read(routingData.bufQueue);
            });
      },
      /* thisIteration = */ true);
}