void QuicClientTransport::processPacketData()

in quic/client/QuicClientTransport.cpp [175:770]


void QuicClientTransport::processPacketData(
    const folly::SocketAddress& peer,
    TimePoint receiveTimePoint,
    BufQueue& packetQueue) {
  auto packetSize = packetQueue.chainLength();
  if (packetSize == 0) {
    return;
  }
  auto parsedPacket = conn_->readCodec->parsePacket(
      packetQueue, conn_->ackStates, conn_->clientConnectionId->size());
  StatelessReset* statelessReset = parsedPacket.statelessReset();
  if (statelessReset) {
    const auto& token = clientConn_->statelessResetToken;
    if (statelessReset->token == token) {
      VLOG(4) << "Received Stateless Reset " << *this;
      conn_->peerConnectionError = QuicError(
          QuicErrorCode(LocalErrorCode::CONNECTION_RESET),
          toString(LocalErrorCode::CONNECTION_RESET).str());
      throw QuicInternalException("Peer reset", LocalErrorCode::NO_ERROR);
    }
    VLOG(4) << "Drop StatelessReset for bad connId or token " << *this;
  }

  RetryPacket* retryPacket = parsedPacket.retryPacket();
  if (retryPacket) {
    if (conn_->qLogger) {
      conn_->qLogger->addPacket(*retryPacket, packetSize, true);
    }

    if (!clientConn_->retryToken.empty()) {
      VLOG(4) << "Server sent more than one retry packet";
      return;
    }

    const ConnectionId* originalDstConnId =
        &(*clientConn_->originalDestinationConnectionId);

    if (!clientConn_->clientHandshakeLayer->verifyRetryIntegrityTag(
            *originalDstConnId, *retryPacket)) {
      VLOG(4) << "The integrity tag in the retry packet was invalid. "
              << "Dropping bad retry packet.";
      return;
    }

    if (happyEyeballsEnabled_) {
      happyEyeballsOnDataReceived(
          *clientConn_, happyEyeballsConnAttemptDelayTimeout_, socket_, peer);
    }
    // Set the destination connection ID to be the value from the source
    // connection id of the retry packet
    clientConn_->initialDestinationConnectionId =
        retryPacket->header.getSourceConnId();

    auto released = static_cast<QuicClientConnectionState*>(conn_.release());
    std::unique_ptr<QuicClientConnectionState> uniqueClient(released);
    auto tempConn = undoAllClientStateForRetry(std::move(uniqueClient));

    clientConn_ = tempConn.get();
    conn_.reset(tempConn.release());

    clientConn_->retryToken = retryPacket->header.getToken();

    // TODO (amsharma): add a "RetryPacket" QLog event, and log it here.
    // TODO (amsharma): verify the "original_connection_id" parameter
    // upon receiving a subsequent initial from the server.

    startCryptoHandshake();
    return;
  }

  auto cipherUnavailable = parsedPacket.cipherUnavailable();
  if (cipherUnavailable && cipherUnavailable->packet &&
      !cipherUnavailable->packet->empty() &&
      (cipherUnavailable->protectionType == ProtectionType::KeyPhaseZero ||
       cipherUnavailable->protectionType == ProtectionType::Handshake) &&
      clientConn_->pendingOneRttData.size() +
              clientConn_->pendingHandshakeData.size() <
          clientConn_->transportSettings.maxPacketsToBuffer) {
    auto& pendingData =
        cipherUnavailable->protectionType == ProtectionType::KeyPhaseZero
        ? clientConn_->pendingOneRttData
        : clientConn_->pendingHandshakeData;
    pendingData.emplace_back(
        NetworkDataSingle(
            std::move(cipherUnavailable->packet), receiveTimePoint),
        peer);
    if (conn_->qLogger) {
      conn_->qLogger->addPacketBuffered(
          cipherUnavailable->protectionType, packetSize);
    }
    return;
  }

  RegularQuicPacket* regularOptional = parsedPacket.regularPacket();
  if (!regularOptional) {
    QUIC_STATS(statsCallback_, onPacketDropped, PacketDropReason::PARSE_ERROR);
    if (conn_->qLogger) {
      conn_->qLogger->addPacketDrop(packetSize, kParse);
    }
    return;
  }

  if (regularOptional->frames.empty()) {
    // This is either a packet that has no data (long-header parsed but no data
    // found) or a regular packet with a short header and no frames. Both are
    // protocol violations.
    QUIC_STATS(
        conn_->statsCallback,
        onPacketDropped,
        PacketDropReason::PROTOCOL_VIOLATION);
    if (conn_->qLogger) {
      conn_->qLogger->addPacketDrop(
          packetSize,
          QuicTransportStatsCallback::toString(
              PacketDropReason::PROTOCOL_VIOLATION));
    }
    throw QuicTransportException(
        "Packet has no frames", TransportErrorCode::PROTOCOL_VIOLATION);
  }

  if (happyEyeballsEnabled_) {
    CHECK(socket_);
    happyEyeballsOnDataReceived(
        *clientConn_, happyEyeballsConnAttemptDelayTimeout_, socket_, peer);
  }

  LongHeader* longHeader = regularOptional->header.asLong();
  ShortHeader* shortHeader = regularOptional->header.asShort();

  auto protectionLevel = regularOptional->header.getProtectionType();
  auto encryptionLevel = protectionTypeToEncryptionLevel(protectionLevel);

  auto packetNum = regularOptional->header.getPacketSequenceNum();
  auto pnSpace = regularOptional->header.getPacketNumberSpace();

  bool isProtectedPacket = protectionLevel == ProtectionType::KeyPhaseZero ||
      protectionLevel == ProtectionType::KeyPhaseOne;

  auto& regularPacket = *regularOptional;
  if (conn_->qLogger) {
    conn_->qLogger->addPacket(regularPacket, packetSize);
  }
  if (!isProtectedPacket) {
    for (auto& quicFrame : regularPacket.frames) {
      auto isPadding = quicFrame.asPaddingFrame();
      auto isAck = quicFrame.asReadAckFrame();
      auto isClose = quicFrame.asConnectionCloseFrame();
      auto isCrypto = quicFrame.asReadCryptoFrame();
      auto isPing = quicFrame.asPingFrame();
      // TODO: add path challenge and response
      if (!isPadding && !isAck && !isClose && !isCrypto && !isPing) {
        throw QuicTransportException(
            "Invalid frame", TransportErrorCode::PROTOCOL_VIOLATION);
      }
    }
  }

  // We got a packet that was not the version negotiation packet, that means
  // that the version is now bound to the new packet.
  if (!conn_->version) {
    conn_->version = conn_->originalVersion;

    if (conn_->version == QuicVersion::MVFST_EXPERIMENTAL) {
      // MVFST_EXPERIMENTAL currently enables experimental congestion control
      // and experimental pacer. (here and in the server state machine)
      if (conn_->congestionController) {
        conn_->congestionController->setExperimental(true);
      }
      if (conn_->pacer) {
        conn_->pacer->setExperimental(true);
      }
    }
  }

  if (!conn_->serverConnectionId && longHeader) {
    conn_->serverConnectionId = longHeader->getSourceConnId();
    conn_->peerConnectionIds.emplace_back(
        longHeader->getSourceConnId(), kInitialSequenceNumber);
    conn_->readCodec->setServerConnectionId(*conn_->serverConnectionId);
  }

  // Error out if the connection id on the packet is not the one that is
  // expected.
  bool connidMatched = true;
  if ((longHeader &&
       longHeader->getDestinationConnId() != *conn_->clientConnectionId) ||
      (shortHeader &&
       shortHeader->getConnectionId() != *conn_->clientConnectionId)) {
    connidMatched = false;
  }
  if (!connidMatched) {
    throw QuicTransportException(
        "Invalid connection id", TransportErrorCode::PROTOCOL_VIOLATION);
  }
  auto& ackState = getAckState(*conn_, pnSpace);
  bool outOfOrder =
      updateLargestReceivedPacketNum(ackState, packetNum, receiveTimePoint);
  if (outOfOrder) {
    QUIC_STATS(conn_->statsCallback, onOutOfOrderPacketReceived);
  }

  bool pktHasRetransmittableData = false;
  bool pktHasCryptoData = false;

  for (auto& quicFrame : regularPacket.frames) {
    switch (quicFrame.type()) {
      case QuicFrame::Type::ReadAckFrame: {
        VLOG(10) << "Client received ack frame in packet=" << packetNum << " "
                 << *this;
        ReadAckFrame& ackFrame = *quicFrame.asReadAckFrame();
        conn_->lastProcessedAckEvents.emplace_back(processAckFrame(
            *conn_,
            pnSpace,
            ackFrame,
            [&](const OutstandingPacket& outstandingPacket,
                const QuicWriteFrame& packetFrame,
                const ReadAckFrame&) {
              auto outstandingProtectionType =
                  outstandingPacket.packet.header.getProtectionType();
              if (outstandingProtectionType == ProtectionType::KeyPhaseZero) {
                // If we received an ack for data that we sent in 1-rtt from
                // the server, we can assume that the server had successfully
                // derived the 1-rtt keys and hence received the client
                // finished message. We can mark the handshake as confirmed and
                // drop the handshake cipher and outstanding packets after the
                // processing loop.
                conn_->handshakeLayer->handshakeConfirmed();
              }
              switch (packetFrame.type()) {
                case QuicWriteFrame::Type::WriteAckFrame: {
                  const WriteAckFrame& frame = *packetFrame.asWriteAckFrame();
                  DCHECK(!frame.ackBlocks.empty());
                  VLOG(4) << "Client received ack for largestAcked="
                          << frame.ackBlocks.front().end << " " << *this;
                  commonAckVisitorForAckFrame(ackState, frame);
                  break;
                }
                case QuicWriteFrame::Type::RstStreamFrame: {
                  const RstStreamFrame& frame = *packetFrame.asRstStreamFrame();
                  VLOG(4) << "Client received ack for reset frame stream="
                          << frame.streamId << " " << *this;

                  auto stream = conn_->streamManager->getStream(frame.streamId);
                  if (stream) {
                    sendRstAckSMHandler(*stream);
                  }
                  break;
                }
                case QuicWriteFrame::Type::WriteStreamFrame: {
                  const WriteStreamFrame& frame =
                      *packetFrame.asWriteStreamFrame();

                  auto ackedStream =
                      conn_->streamManager->getStream(frame.streamId);
                  VLOG(4) << "Client got ack for stream=" << frame.streamId
                          << " offset=" << frame.offset << " fin=" << frame.fin
                          << " data=" << frame.len
                          << " closed=" << (ackedStream == nullptr) << " "
                          << *this;
                  if (ackedStream) {
                    sendAckSMHandler(*ackedStream, frame);
                  }
                  break;
                }
                case QuicWriteFrame::Type::WriteCryptoFrame: {
                  const WriteCryptoFrame& frame =
                      *packetFrame.asWriteCryptoFrame();
                  auto cryptoStream = getCryptoStream(
                      *conn_->cryptoState,
                      protectionTypeToEncryptionLevel(
                          outstandingProtectionType));
                  processCryptoStreamAck(
                      *cryptoStream, frame.offset, frame.len);
                  break;
                }
                case QuicWriteFrame::Type::PingFrame:
                  conn_->pendingEvents.cancelPingTimeout = true;
                  break;
                case QuicWriteFrame::Type::QuicSimpleFrame:
                default:
                  // ignore other frames.
                  break;
              }
            },
            markPacketLoss,
            receiveTimePoint));
        break;
      }
      case QuicFrame::Type::RstStreamFrame: {
        RstStreamFrame& frame = *quicFrame.asRstStreamFrame();
        VLOG(10) << "Client received reset stream=" << frame.streamId << " "
                 << *this;
        pktHasRetransmittableData = true;
        auto streamId = frame.streamId;
        auto stream = conn_->streamManager->getStream(streamId);
        if (!stream) {
          break;
        }
        receiveRstStreamSMHandler(*stream, frame);
        break;
      }
      case QuicFrame::Type::ReadCryptoFrame: {
        pktHasRetransmittableData = true;
        pktHasCryptoData = true;
        ReadCryptoFrame& cryptoFrame = *quicFrame.asReadCryptoFrame();
        VLOG(10) << "Client received crypto data offset=" << cryptoFrame.offset
                 << " len=" << cryptoFrame.data->computeChainDataLength()
                 << " packetNum=" << packetNum << " " << *this;
        appendDataToReadBuffer(
            *getCryptoStream(*conn_->cryptoState, encryptionLevel),
            StreamBuffer(
                std::move(cryptoFrame.data), cryptoFrame.offset, false));
        break;
      }
      case QuicFrame::Type::ReadStreamFrame: {
        ReadStreamFrame& frame = *quicFrame.asReadStreamFrame();
        VLOG(10) << "Client received stream data for stream=" << frame.streamId
                 << " offset=" << frame.offset
                 << " len=" << frame.data->computeChainDataLength()
                 << " fin=" << frame.fin << " packetNum=" << packetNum << " "
                 << *this;
        auto stream = conn_->streamManager->getStream(frame.streamId);
        pktHasRetransmittableData = true;
        if (!stream) {
          VLOG(10) << "Could not find stream=" << frame.streamId << " "
                   << *conn_;
          break;
        }
        receiveReadStreamFrameSMHandler(*stream, std::move(frame));
        break;
      }
      case QuicFrame::Type::ReadNewTokenFrame: {
        ReadNewTokenFrame& newTokenFrame = *quicFrame.asReadNewTokenFrame();
        std::string tokenStr =
            newTokenFrame.token->moveToFbString().toStdString();
        VLOG(10) << "client received new token token="
                 << folly::hexlify(tokenStr);
        if (newTokenCallback_) {
          newTokenCallback_(std::move(tokenStr));
        }
        break;
      }
      case QuicFrame::Type::MaxDataFrame: {
        MaxDataFrame& connWindowUpdate = *quicFrame.asMaxDataFrame();
        VLOG(10) << "Client received max data offset="
                 << connWindowUpdate.maximumData << " " << *this;
        pktHasRetransmittableData = true;
        handleConnWindowUpdate(*conn_, connWindowUpdate, packetNum);
        break;
      }
      case QuicFrame::Type::MaxStreamDataFrame: {
        MaxStreamDataFrame& streamWindowUpdate =
            *quicFrame.asMaxStreamDataFrame();
        VLOG(10) << "Client received max stream data stream="
                 << streamWindowUpdate.streamId
                 << " offset=" << streamWindowUpdate.maximumData << " "
                 << *this;
        if (isReceivingStream(conn_->nodeType, streamWindowUpdate.streamId)) {
          throw QuicTransportException(
              "Received MaxStreamDataFrame for receiving stream.",
              TransportErrorCode::STREAM_STATE_ERROR);
        }
        pktHasRetransmittableData = true;
        auto stream =
            conn_->streamManager->getStream(streamWindowUpdate.streamId);
        if (stream) {
          handleStreamWindowUpdate(
              *stream, streamWindowUpdate.maximumData, packetNum);
        }
        break;
      }
      case QuicFrame::Type::DataBlockedFrame: {
        VLOG(10) << "Client received blocked " << *this;
        pktHasRetransmittableData = true;
        handleConnBlocked(*conn_);
        break;
      }
      case QuicFrame::Type::StreamDataBlockedFrame: {
        // peer wishes to send data, but is unable to due to stream-level flow
        // control
        StreamDataBlockedFrame& blocked = *quicFrame.asStreamDataBlockedFrame();
        VLOG(10) << "Client received blocked stream=" << blocked.streamId << " "
                 << *this;
        pktHasRetransmittableData = true;
        auto stream = conn_->streamManager->getStream(blocked.streamId);
        if (stream) {
          handleStreamBlocked(*stream);
        }
        break;
      }
      case QuicFrame::Type::StreamsBlockedFrame: {
        // peer wishes to open a stream, but is unable to due to the maximum
        // stream limit set by us
        StreamsBlockedFrame& blocked = *quicFrame.asStreamsBlockedFrame();
        VLOG(10) << "Client received stream blocked limit="
                 << blocked.streamLimit << " " << *this;
        // TODO implement handler for it
        break;
      }
      case QuicFrame::Type::ConnectionCloseFrame: {
        ConnectionCloseFrame& connFrame = *quicFrame.asConnectionCloseFrame();
        auto errMsg = folly::to<std::string>(
            "Client closed by peer reason=", connFrame.reasonPhrase);
        VLOG(4) << errMsg << " " << *this;
        // we want to deliver app callbacks with the peer supplied error,
        // but send a NO_ERROR to the peer.
        if (conn_->qLogger) {
          conn_->qLogger->addTransportStateUpdate(getPeerClose(errMsg));
        }
        conn_->peerConnectionError =
            QuicError(QuicErrorCode(connFrame.errorCode), std::move(errMsg));
        throw QuicTransportException(
            "Peer closed", TransportErrorCode::NO_ERROR);
        break;
      }
      case QuicFrame::Type::PingFrame:
        // Ping isn't retransmittable. But we would like to ack them early.
        // So, make Ping frames count towards ack policy
        pktHasRetransmittableData = true;
        conn_->pendingEvents.notifyPingReceived = true;
        break;
      case QuicFrame::Type::PaddingFrame:
        break;
      case QuicFrame::Type::QuicSimpleFrame: {
        QuicSimpleFrame& simpleFrame = *quicFrame.asQuicSimpleFrame();
        pktHasRetransmittableData = true;
        updateSimpleFrameOnPacketReceived(
            *conn_, simpleFrame, packetNum, false);
        break;
      }
      case QuicFrame::Type::DatagramFrame: {
        DatagramFrame& frame = *quicFrame.asDatagramFrame();
        VLOG(10) << "Client received datagram data: "
                 << "len=" << frame.length << " " << *this;
        // Datagram isn't retransmittable. But we would like to ack them early.
        // So, make Datagram frames count towards ack policy
        pktHasRetransmittableData = true;
        handleDatagram(*conn_, frame, receiveTimePoint);
        break;
      }
      default:
        break;
    }
  }

  auto handshakeLayer = clientConn_->clientHandshakeLayer;
  if (handshakeLayer->getPhase() == ClientHandshake::Phase::Established &&
      hasInitialOrHandshakeCiphers(*conn_)) {
    handshakeConfirmed(*conn_);
  }

  // Try reading bytes off of crypto, and performing a handshake.
  auto cryptoData = readDataFromCryptoStream(
      *getCryptoStream(*conn_->cryptoState, encryptionLevel));
  if (cryptoData) {
    bool hadOneRttKey = conn_->oneRttWriteCipher != nullptr;
    handshakeLayer->doHandshake(std::move(cryptoData), encryptionLevel);
    bool oneRttKeyDerivationTriggered = false;
    if (!hadOneRttKey && conn_->oneRttWriteCipher) {
      oneRttKeyDerivationTriggered = true;
      updatePacingOnKeyEstablished(*conn_);
    }
    if (conn_->oneRttWriteCipher && conn_->readCodec->getOneRttReadCipher()) {
      clientConn_->zeroRttWriteCipher.reset();
      clientConn_->zeroRttWriteHeaderCipher.reset();
    }
    auto zeroRttRejected = handshakeLayer->getZeroRttRejected();
    if (zeroRttRejected.has_value() && *zeroRttRejected) {
      if (conn_->qLogger) {
        conn_->qLogger->addTransportStateUpdate(kZeroRttRejected);
      }
      QUIC_STATS(conn_->statsCallback, onZeroRttRejected);
      handshakeLayer->removePsk(hostname_);
    } else if (zeroRttRejected.has_value()) {
      if (conn_->qLogger) {
        conn_->qLogger->addTransportStateUpdate(kZeroRttAccepted);
      }
      QUIC_STATS(conn_->statsCallback, onZeroRttAccepted);
      conn_->usedZeroRtt = true;
    }
    // We should get transport parameters if we've derived 1-rtt keys and 0-rtt
    // was rejected, or we have derived 1-rtt keys and 0-rtt was never
    // attempted.
    if (oneRttKeyDerivationTriggered) {
      auto serverParams = handshakeLayer->getServerTransportParams();
      if (!serverParams) {
        throw QuicTransportException(
            "No server transport params",
            TransportErrorCode::TRANSPORT_PARAMETER_ERROR);
      }
      if ((zeroRttRejected.has_value() && *zeroRttRejected) ||
          !zeroRttRejected.has_value()) {
        auto originalPeerMaxOffset =
            conn_->flowControlState.peerAdvertisedMaxOffset;
        auto originalPeerInitialStreamOffsetBidiLocal =
            conn_->flowControlState
                .peerAdvertisedInitialMaxStreamOffsetBidiLocal;
        auto originalPeerInitialStreamOffsetBidiRemote =
            conn_->flowControlState
                .peerAdvertisedInitialMaxStreamOffsetBidiRemote;
        auto originalPeerInitialStreamOffsetUni =
            conn_->flowControlState.peerAdvertisedInitialMaxStreamOffsetUni;
        VLOG(10) << "Client negotiated transport params " << *this;
        auto maxStreamsBidi = getIntegerParameter(
            TransportParameterId::initial_max_streams_bidi,
            serverParams->parameters);
        auto maxStreamsUni = getIntegerParameter(
            TransportParameterId::initial_max_streams_uni,
            serverParams->parameters);
        processServerInitialParams(
            *clientConn_, std::move(*serverParams), packetNum);

        cacheServerInitialParams(
            *clientConn_,
            conn_->flowControlState.peerAdvertisedMaxOffset,
            conn_->flowControlState
                .peerAdvertisedInitialMaxStreamOffsetBidiLocal,
            conn_->flowControlState
                .peerAdvertisedInitialMaxStreamOffsetBidiRemote,
            conn_->flowControlState.peerAdvertisedInitialMaxStreamOffsetUni,
            maxStreamsBidi.value_or(0),
            maxStreamsUni.value_or(0));

        if (zeroRttRejected.has_value() && *zeroRttRejected) {
          // verify that the new flow control parameters are >= the original
          // transport parameters that were use. This is the easy case. If the
          // flow control decreases then we are just screwed and we need to have
          // the app retry the connection. The other parameters can be updated.
          // TODO: implement undo transport state on retry.
          if (originalPeerMaxOffset >
                  conn_->flowControlState.peerAdvertisedMaxOffset ||
              originalPeerInitialStreamOffsetBidiLocal >
                  conn_->flowControlState
                      .peerAdvertisedInitialMaxStreamOffsetBidiLocal ||
              originalPeerInitialStreamOffsetBidiRemote >
                  conn_->flowControlState
                      .peerAdvertisedInitialMaxStreamOffsetBidiRemote ||

              originalPeerInitialStreamOffsetUni >
                  conn_->flowControlState
                      .peerAdvertisedInitialMaxStreamOffsetUni) {
            throw QuicTransportException(
                "Rejection of zero rtt parameters unsupported",
                TransportErrorCode::TRANSPORT_PARAMETER_ERROR);
          }
        }
      }
      // TODO This sucks, but manually update the max packet size until we fix
      // 0-rtt transport parameters.
      if (conn_->transportSettings.canIgnorePathMTU &&
          zeroRttRejected.has_value() && !*zeroRttRejected) {
        auto updatedPacketSize = getIntegerParameter(
            TransportParameterId::max_packet_size, serverParams->parameters);
        updatedPacketSize = std::max<uint64_t>(
            updatedPacketSize.value_or(kDefaultUDPSendPacketLen),
            kDefaultUDPSendPacketLen);
        updatedPacketSize =
            std::min<uint64_t>(*updatedPacketSize, kDefaultMaxUDPPayload);
        conn_->udpSendPacketLen = *updatedPacketSize;
      }

      // TODO this is another bandaid. Explicitly set the stateless reset token
      // or else conns that use 0-RTT won't be able to parse stateless resets.
      if (!clientConn_->statelessResetToken) {
        clientConn_->statelessResetToken =
            getStatelessResetTokenParameter(serverParams->parameters);
      }
      if (clientConn_->statelessResetToken) {
        conn_->readCodec->setStatelessResetToken(
            clientConn_->statelessResetToken.value());
      }
    }

    if (zeroRttRejected.has_value() && *zeroRttRejected) {
      // TODO: Make sure the alpn is the same, if not then do a full undo of the
      // state.
      clientConn_->zeroRttWriteCipher.reset();
      clientConn_->zeroRttWriteHeaderCipher.reset();
      markZeroRttPacketsLost(*conn_, markPacketLoss);
    }
  }
  updateAckSendStateOnRecvPacket(
      *conn_,
      ackState,
      outOfOrder,
      pktHasRetransmittableData,
      pktHasCryptoData);
  if (encryptionLevel == EncryptionLevel::Handshake &&
      conn_->initialWriteCipher) {
    conn_->initialWriteCipher.reset();
    conn_->initialHeaderCipher.reset();
    conn_->readCodec->setInitialReadCipher(nullptr);
    conn_->readCodec->setInitialHeaderCipher(nullptr);
    implicitAckCryptoStream(*conn_, EncryptionLevel::Initial);
  }
}