void QuicServerWorker::dispatchPacketData()

in quic/server/QuicServerWorker.cpp [552:858]


void QuicServerWorker::dispatchPacketData(
    const folly::SocketAddress& client,
    RoutingData&& routingData,
    NetworkData&& networkData,
    folly::Optional<QuicVersion> quicVersion,
    bool isForwardedData) noexcept {
  DCHECK(socket_);
  QuicServerTransport::Ptr transport;
  bool dropPacket = false;
  auto cit = connectionIdMap_.find(routingData.destinationConnId);
  if (cit != connectionIdMap_.end()) {
    transport = cit->second;
    VLOG(10) << "Found existing connection for CID="
             << routingData.destinationConnId.hex() << " " << *transport;
  } else if (routingData.headerForm != HeaderForm::Long) {
    // Drop the packet if the header form is not long
    VLOG(3) << fmt::format(
        "Dropping non-long header packet with no connid match"
        " headerForm={}, routingInfo={}",
        static_cast<typename std::underlying_type<HeaderForm>::type>(
            routingData.headerForm),
        logRoutingInfo(routingData.destinationConnId));
    // Try forwarding the packet to the old server (if it is enabled)
    dropPacket = true;
  }

  bool cannotMakeTransport = false;
  if (!dropPacket && !transport) {
    // For LongHeader packets without existing associated connection, try to
    // route with destinationConnId chosen by the peer and IP address of the
    // peer.
    CHECK(transportFactory_);
    auto source = std::make_pair(client, routingData.destinationConnId);
    auto sit = sourceAddressMap_.find(source);
    if (sit == sourceAddressMap_.end()) {
      // If it's a 0RTT packet and we have no CID, we probably lost the initial
      // and want to buffer it for a while.
      if (routingData.is0Rtt) {
        auto itr = pending0RttData_.find(routingData.destinationConnId);
        if (itr == pending0RttData_.end()) {
          itr =
              pending0RttData_.insert(routingData.destinationConnId, {}).first;
        }
        auto& vec = itr->second;
        if (vec.size() != vec.max_size()) {
          vec.emplace_back(std::move(networkData));
          QUIC_STATS(statsCallback_, onZeroRttBuffered);
        }
        return;
      } else if (!routingData.isInitial) {
        VLOG(3) << fmt::format(
            "Dropping packet from client={}, routingInfo={}",
            client.describe(),
            logRoutingInfo(routingData.destinationConnId));
        dropPacket = true;
      } else {
        VLOG(4) << fmt::format(
            "Creating new connection for client={}, routingInfo={}",
            client.describe(),
            logRoutingInfo(routingData.destinationConnId));

        // This could be a new connection, add it in the map
        // verify that the initial packet is at least min initial bytes
        // to avoid amplification attacks.
        if (networkData.totalData < kMinInitialPacketSize) {
          // Don't even attempt to forward the packet, just drop it.
          VLOG(3) << "Dropping small initial packet from client=" << client;
          QUIC_STATS(
              statsCallback_,
              onPacketDropped,
              PacketDropReason::INVALID_PACKET);
          return;
        }

        // If there is a token present, decrypt it (could be either a retry
        // token or a new token)
        folly::io::Cursor cursor(networkData.packets.front().get());
        auto maybeEncryptedToken = maybeGetEncryptedToken(cursor);
        bool hasTokenSecret = transportSettings_.retryTokenSecret.hasValue();

        // If the retryTokenSecret is not set, just skip evaluating validity of
        // token and assume true
        auto isValidRetryToken = !hasTokenSecret ||
            (maybeEncryptedToken &&
             validRetryToken(
                 *maybeEncryptedToken,
                 routingData.destinationConnId,
                 client.getIPAddress()));

        auto isValidNewToken = !hasTokenSecret ||
            (maybeEncryptedToken &&
             validNewToken(*maybeEncryptedToken, client.getIPAddress()));

        if (isValidNewToken) {
          QUIC_STATS(statsCallback_, onNewTokenReceived);
        } else if (maybeEncryptedToken && !isValidRetryToken) {
          // Failed to decrypt the token as either a new or retry token
          QUIC_STATS(statsCallback_, onTokenDecryptFailure);
        }

        // If rate-limiting is configured and there is no retry token,
        // send a retry packet back to the client
        if (!isValidRetryToken &&
            ((newConnRateLimiter_ &&
              newConnRateLimiter_->check(networkData.receiveTimePoint)) ||
             (unfinishedHandshakeLimitFn_.has_value() &&
              globalUnfinishedHandshakes >=
                  (*unfinishedHandshakeLimitFn_)()))) {
          if (hasTokenSecret) {
            sendRetryPacket(
                client,
                routingData.destinationConnId,
                routingData.sourceConnId.value_or(
                    ConnectionId(std::vector<uint8_t>())));
            QUIC_STATS(statsCallback_, onConnectionRateLimited);
            return;
          } else {
            VLOG(4)
                << "Not sending retry packet since retry token secret is not set";
          }
        }

        // Check that we have a proper quic version before creating transport.
        CHECK(quicVersion.has_value())
            << "no QUIC version supplied for transport creation";

        // create 'accepting' transport
        auto sock = makeSocket(getEventBase());

        auto trans = transportFactory_->make(
            getEventBase(), std::move(sock), client, quicVersion.value(), ctx_);
        if (!trans) {
          dropPacket = true;
          cannotMakeTransport = true;
        } else {
          globalUnfinishedHandshakes++;
          CHECK(trans);
          if (transportSettings_.dataPathType ==
                  DataPathType::ContinuousMemory &&
              bufAccessor_) {
            trans->setBufAccessor(bufAccessor_.get());
          }
          trans->setPacingTimer(pacingTimer_);
          trans->setRoutingCallback(this);
          trans->setHandshakeFinishedCallback(this);
          trans->setSupportedVersions(supportedVersions_);
          trans->setOriginalPeerAddress(client);
#ifdef CCP_ENABLED
          trans->setCcpDatapath(getCcpReader()->getDatapath());
#endif
          trans->setCongestionControllerFactory(ccFactory_);
          if (statsCallback_) {
            trans->setTransportStatsCallback(statsCallback_.get());
          }
          auto overridenTransportSettings = transportSettingsOverrideFn_
              ? transportSettingsOverrideFn_(
                    transportSettings_, client.getIPAddress())
              : folly::none;

          if (overridenTransportSettings) {
            if (overridenTransportSettings->dataPathType !=
                transportSettings_.dataPathType) {
              // It's too complex to support that.
              LOG(ERROR)
                  << "Overriding DataPathType isn't supported. Requested datapath="
                  << (overridenTransportSettings->dataPathType ==
                              DataPathType::ContinuousMemory
                          ? "ContinuousMemory"
                          : "ChainedMemory");
            }
            trans->setTransportSettings(*overridenTransportSettings);
          } else {
            trans->setTransportSettings(transportSettings_);
          }
          trans->setConnectionIdAlgo(connIdAlgo_.get());
          trans->setServerConnectionIdRejector(this);
          if (routingData.sourceConnId) {
            trans->setClientConnectionId(*routingData.sourceConnId);
          }
          trans->setClientChosenDestConnectionId(routingData.destinationConnId);
          // parameters to create server chosen connection id
          ServerConnectionIdParams serverConnIdParams(
              cidVersion_,
              hostId_,
              static_cast<uint8_t>(processId_),
              workerId_);
          trans->setServerConnectionIdParams(std::move(serverConnIdParams));
          trans->accept();
          auto result = sourceAddressMap_.emplace(std::make_pair(
              std::make_pair(client, routingData.destinationConnId), trans));
          if (!result.second) {
            LOG(ERROR) << fmt::format(
                "Routing entry already exists for client={}, routingInfo={}",
                client.describe(),
                logRoutingInfo(routingData.destinationConnId));
            dropPacket = true;
          } else {
            for (const auto& observer : observerList_.getAll()) {
              observer->accept(trans.get());
            }
          }
          transport = trans;
        }
      }
    } else {
      transport = sit->second;
      VLOG(4) << "Found existing connection for client=" << client << " "
              << *transport;
    }
  }
  if (!dropPacket) {
    DCHECK(transport->getEventBase()->isInEventBaseThread());
    transport->onNetworkData(client, std::move(networkData));
    // If we had pending 0RTT data for this DCID, process it.
    if (routingData.isInitial && !pending0RttData_.empty()) {
      auto itr = pending0RttData_.find(routingData.destinationConnId);
      if (itr != pending0RttData_.end()) {
        for (auto& data : itr->second) {
          transport->onNetworkData(client, std::move(data));
        }
        pending0RttData_.erase(itr);
      }
    }
    return;
  }
  if (cannotMakeTransport) {
    VLOG(3)
        << "Dropping packet due to transport factory did not make transport";
    QUIC_STATS(
        statsCallback_,
        onPacketDropped,
        PacketDropReason::CANNOT_MAKE_TRANSPORT);
    return;
  }
  if (!connIdAlgo_->canParse(routingData.destinationConnId)) {
    VLOG(3) << "Dropping packet with bad DCID, routingInfo="
            << logRoutingInfo(routingData.destinationConnId);
    QUIC_STATS(statsCallback_, onPacketDropped, PacketDropReason::PARSE_ERROR);
    // TODO do we need to reset?
    return;
  }
  auto connIdParam =
      connIdAlgo_->parseConnectionId(routingData.destinationConnId);
  if (connIdParam.hasError()) {
    VLOG(3) << fmt::format(
        "Dropping packet due to DCID parsing error={}, , errorCode={}, routingInfo={}",
        connIdParam.error().what(),
        folly::to<std::string>(connIdParam.error().errorCode()),
        logRoutingInfo(routingData.destinationConnId));
    QUIC_STATS(statsCallback_, onPacketDropped, PacketDropReason::PARSE_ERROR);
    // TODO do we need to reset?
    return;
  }
  if (connIdParam->hostId != hostId_) {
    VLOG_EVERY_N(2, 100) << fmt::format(
        "Dropping packet routed to wrong host, from client={}, routingInfo={},",
        client.describe(),
        logRoutingInfo(routingData.destinationConnId));
    QUIC_STATS(
        statsCallback_,
        onPacketDropped,
        PacketDropReason::ROUTING_ERROR_WRONG_HOST);
    return sendResetPacket(
        routingData.headerForm,
        client,
        networkData,
        routingData.destinationConnId);
  }

  if (!packetForwardingEnabled_ || isForwardedData) {
    QUIC_STATS(
        statsCallback_,
        onPacketDropped,
        PacketDropReason::CONNECTION_NOT_FOUND);
    return sendResetPacket(
        routingData.headerForm,
        client,
        networkData,
        routingData.destinationConnId);
  }

  // There's no existing connection for the packet's CID or the client's
  // addr, and doesn't belong to the old server. Send a Reset.
  if (connIdParam->processId == static_cast<uint8_t>(processId_)) {
    QUIC_STATS(
        statsCallback_,
        onPacketDropped,
        PacketDropReason::CONNECTION_NOT_FOUND);
    return sendResetPacket(
        routingData.headerForm,
        client,
        networkData,
        routingData.destinationConnId);
  }

  // Optimistically route to another server
  // if the packet type is not Initial and if there is not any connection
  // associated with the given packet
  VLOG(4) << fmt::format(
      "Forwarding packet from client={} to another process, routingInfo={}",
      client.describe(),
      logRoutingInfo(routingData.destinationConnId));
  auto recvTime = networkData.receiveTimePoint;
  takeoverPktHandler_.forwardPacketToAnotherServer(
      client, std::move(networkData).moveAllData(), recvTime);
  QUIC_STATS(statsCallback_, onPacketForwarded);
}