in quic/client/QuicClientTransport.cpp [175:770]
void QuicClientTransport::processPacketData(
const folly::SocketAddress& peer,
TimePoint receiveTimePoint,
BufQueue& packetQueue) {
auto packetSize = packetQueue.chainLength();
if (packetSize == 0) {
return;
}
auto parsedPacket = conn_->readCodec->parsePacket(
packetQueue, conn_->ackStates, conn_->clientConnectionId->size());
StatelessReset* statelessReset = parsedPacket.statelessReset();
if (statelessReset) {
const auto& token = clientConn_->statelessResetToken;
if (statelessReset->token == token) {
VLOG(4) << "Received Stateless Reset " << *this;
conn_->peerConnectionError = QuicError(
QuicErrorCode(LocalErrorCode::CONNECTION_RESET),
toString(LocalErrorCode::CONNECTION_RESET).str());
throw QuicInternalException("Peer reset", LocalErrorCode::NO_ERROR);
}
VLOG(4) << "Drop StatelessReset for bad connId or token " << *this;
}
RetryPacket* retryPacket = parsedPacket.retryPacket();
if (retryPacket) {
if (conn_->qLogger) {
conn_->qLogger->addPacket(*retryPacket, packetSize, true);
}
if (!clientConn_->retryToken.empty()) {
VLOG(4) << "Server sent more than one retry packet";
return;
}
const ConnectionId* originalDstConnId =
&(*clientConn_->originalDestinationConnectionId);
if (!clientConn_->clientHandshakeLayer->verifyRetryIntegrityTag(
*originalDstConnId, *retryPacket)) {
VLOG(4) << "The integrity tag in the retry packet was invalid. "
<< "Dropping bad retry packet.";
return;
}
if (happyEyeballsEnabled_) {
happyEyeballsOnDataReceived(
*clientConn_, happyEyeballsConnAttemptDelayTimeout_, socket_, peer);
}
// Set the destination connection ID to be the value from the source
// connection id of the retry packet
clientConn_->initialDestinationConnectionId =
retryPacket->header.getSourceConnId();
auto released = static_cast<QuicClientConnectionState*>(conn_.release());
std::unique_ptr<QuicClientConnectionState> uniqueClient(released);
auto tempConn = undoAllClientStateForRetry(std::move(uniqueClient));
clientConn_ = tempConn.get();
conn_.reset(tempConn.release());
clientConn_->retryToken = retryPacket->header.getToken();
// TODO (amsharma): add a "RetryPacket" QLog event, and log it here.
// TODO (amsharma): verify the "original_connection_id" parameter
// upon receiving a subsequent initial from the server.
startCryptoHandshake();
return;
}
auto cipherUnavailable = parsedPacket.cipherUnavailable();
if (cipherUnavailable && cipherUnavailable->packet &&
!cipherUnavailable->packet->empty() &&
(cipherUnavailable->protectionType == ProtectionType::KeyPhaseZero ||
cipherUnavailable->protectionType == ProtectionType::Handshake) &&
clientConn_->pendingOneRttData.size() +
clientConn_->pendingHandshakeData.size() <
clientConn_->transportSettings.maxPacketsToBuffer) {
auto& pendingData =
cipherUnavailable->protectionType == ProtectionType::KeyPhaseZero
? clientConn_->pendingOneRttData
: clientConn_->pendingHandshakeData;
pendingData.emplace_back(
NetworkDataSingle(
std::move(cipherUnavailable->packet), receiveTimePoint),
peer);
if (conn_->qLogger) {
conn_->qLogger->addPacketBuffered(
cipherUnavailable->protectionType, packetSize);
}
return;
}
RegularQuicPacket* regularOptional = parsedPacket.regularPacket();
if (!regularOptional) {
QUIC_STATS(statsCallback_, onPacketDropped, PacketDropReason::PARSE_ERROR);
if (conn_->qLogger) {
conn_->qLogger->addPacketDrop(packetSize, kParse);
}
return;
}
if (regularOptional->frames.empty()) {
// This is either a packet that has no data (long-header parsed but no data
// found) or a regular packet with a short header and no frames. Both are
// protocol violations.
QUIC_STATS(
conn_->statsCallback,
onPacketDropped,
PacketDropReason::PROTOCOL_VIOLATION);
if (conn_->qLogger) {
conn_->qLogger->addPacketDrop(
packetSize,
QuicTransportStatsCallback::toString(
PacketDropReason::PROTOCOL_VIOLATION));
}
throw QuicTransportException(
"Packet has no frames", TransportErrorCode::PROTOCOL_VIOLATION);
}
if (happyEyeballsEnabled_) {
CHECK(socket_);
happyEyeballsOnDataReceived(
*clientConn_, happyEyeballsConnAttemptDelayTimeout_, socket_, peer);
}
LongHeader* longHeader = regularOptional->header.asLong();
ShortHeader* shortHeader = regularOptional->header.asShort();
auto protectionLevel = regularOptional->header.getProtectionType();
auto encryptionLevel = protectionTypeToEncryptionLevel(protectionLevel);
auto packetNum = regularOptional->header.getPacketSequenceNum();
auto pnSpace = regularOptional->header.getPacketNumberSpace();
bool isProtectedPacket = protectionLevel == ProtectionType::KeyPhaseZero ||
protectionLevel == ProtectionType::KeyPhaseOne;
auto& regularPacket = *regularOptional;
if (conn_->qLogger) {
conn_->qLogger->addPacket(regularPacket, packetSize);
}
if (!isProtectedPacket) {
for (auto& quicFrame : regularPacket.frames) {
auto isPadding = quicFrame.asPaddingFrame();
auto isAck = quicFrame.asReadAckFrame();
auto isClose = quicFrame.asConnectionCloseFrame();
auto isCrypto = quicFrame.asReadCryptoFrame();
auto isPing = quicFrame.asPingFrame();
// TODO: add path challenge and response
if (!isPadding && !isAck && !isClose && !isCrypto && !isPing) {
throw QuicTransportException(
"Invalid frame", TransportErrorCode::PROTOCOL_VIOLATION);
}
}
}
// We got a packet that was not the version negotiation packet, that means
// that the version is now bound to the new packet.
if (!conn_->version) {
conn_->version = conn_->originalVersion;
if (conn_->version == QuicVersion::MVFST_EXPERIMENTAL) {
// MVFST_EXPERIMENTAL currently enables experimental congestion control
// and experimental pacer. (here and in the server state machine)
if (conn_->congestionController) {
conn_->congestionController->setExperimental(true);
}
if (conn_->pacer) {
conn_->pacer->setExperimental(true);
}
}
}
if (!conn_->serverConnectionId && longHeader) {
conn_->serverConnectionId = longHeader->getSourceConnId();
conn_->peerConnectionIds.emplace_back(
longHeader->getSourceConnId(), kInitialSequenceNumber);
conn_->readCodec->setServerConnectionId(*conn_->serverConnectionId);
}
// Error out if the connection id on the packet is not the one that is
// expected.
bool connidMatched = true;
if ((longHeader &&
longHeader->getDestinationConnId() != *conn_->clientConnectionId) ||
(shortHeader &&
shortHeader->getConnectionId() != *conn_->clientConnectionId)) {
connidMatched = false;
}
if (!connidMatched) {
throw QuicTransportException(
"Invalid connection id", TransportErrorCode::PROTOCOL_VIOLATION);
}
auto& ackState = getAckState(*conn_, pnSpace);
bool outOfOrder =
updateLargestReceivedPacketNum(ackState, packetNum, receiveTimePoint);
if (outOfOrder) {
QUIC_STATS(conn_->statsCallback, onOutOfOrderPacketReceived);
}
bool pktHasRetransmittableData = false;
bool pktHasCryptoData = false;
for (auto& quicFrame : regularPacket.frames) {
switch (quicFrame.type()) {
case QuicFrame::Type::ReadAckFrame: {
VLOG(10) << "Client received ack frame in packet=" << packetNum << " "
<< *this;
ReadAckFrame& ackFrame = *quicFrame.asReadAckFrame();
conn_->lastProcessedAckEvents.emplace_back(processAckFrame(
*conn_,
pnSpace,
ackFrame,
[&](const OutstandingPacket& outstandingPacket,
const QuicWriteFrame& packetFrame,
const ReadAckFrame&) {
auto outstandingProtectionType =
outstandingPacket.packet.header.getProtectionType();
if (outstandingProtectionType == ProtectionType::KeyPhaseZero) {
// If we received an ack for data that we sent in 1-rtt from
// the server, we can assume that the server had successfully
// derived the 1-rtt keys and hence received the client
// finished message. We can mark the handshake as confirmed and
// drop the handshake cipher and outstanding packets after the
// processing loop.
conn_->handshakeLayer->handshakeConfirmed();
}
switch (packetFrame.type()) {
case QuicWriteFrame::Type::WriteAckFrame: {
const WriteAckFrame& frame = *packetFrame.asWriteAckFrame();
DCHECK(!frame.ackBlocks.empty());
VLOG(4) << "Client received ack for largestAcked="
<< frame.ackBlocks.front().end << " " << *this;
commonAckVisitorForAckFrame(ackState, frame);
break;
}
case QuicWriteFrame::Type::RstStreamFrame: {
const RstStreamFrame& frame = *packetFrame.asRstStreamFrame();
VLOG(4) << "Client received ack for reset frame stream="
<< frame.streamId << " " << *this;
auto stream = conn_->streamManager->getStream(frame.streamId);
if (stream) {
sendRstAckSMHandler(*stream);
}
break;
}
case QuicWriteFrame::Type::WriteStreamFrame: {
const WriteStreamFrame& frame =
*packetFrame.asWriteStreamFrame();
auto ackedStream =
conn_->streamManager->getStream(frame.streamId);
VLOG(4) << "Client got ack for stream=" << frame.streamId
<< " offset=" << frame.offset << " fin=" << frame.fin
<< " data=" << frame.len
<< " closed=" << (ackedStream == nullptr) << " "
<< *this;
if (ackedStream) {
sendAckSMHandler(*ackedStream, frame);
}
break;
}
case QuicWriteFrame::Type::WriteCryptoFrame: {
const WriteCryptoFrame& frame =
*packetFrame.asWriteCryptoFrame();
auto cryptoStream = getCryptoStream(
*conn_->cryptoState,
protectionTypeToEncryptionLevel(
outstandingProtectionType));
processCryptoStreamAck(
*cryptoStream, frame.offset, frame.len);
break;
}
case QuicWriteFrame::Type::PingFrame:
conn_->pendingEvents.cancelPingTimeout = true;
break;
case QuicWriteFrame::Type::QuicSimpleFrame:
default:
// ignore other frames.
break;
}
},
markPacketLoss,
receiveTimePoint));
break;
}
case QuicFrame::Type::RstStreamFrame: {
RstStreamFrame& frame = *quicFrame.asRstStreamFrame();
VLOG(10) << "Client received reset stream=" << frame.streamId << " "
<< *this;
pktHasRetransmittableData = true;
auto streamId = frame.streamId;
auto stream = conn_->streamManager->getStream(streamId);
if (!stream) {
break;
}
receiveRstStreamSMHandler(*stream, frame);
break;
}
case QuicFrame::Type::ReadCryptoFrame: {
pktHasRetransmittableData = true;
pktHasCryptoData = true;
ReadCryptoFrame& cryptoFrame = *quicFrame.asReadCryptoFrame();
VLOG(10) << "Client received crypto data offset=" << cryptoFrame.offset
<< " len=" << cryptoFrame.data->computeChainDataLength()
<< " packetNum=" << packetNum << " " << *this;
appendDataToReadBuffer(
*getCryptoStream(*conn_->cryptoState, encryptionLevel),
StreamBuffer(
std::move(cryptoFrame.data), cryptoFrame.offset, false));
break;
}
case QuicFrame::Type::ReadStreamFrame: {
ReadStreamFrame& frame = *quicFrame.asReadStreamFrame();
VLOG(10) << "Client received stream data for stream=" << frame.streamId
<< " offset=" << frame.offset
<< " len=" << frame.data->computeChainDataLength()
<< " fin=" << frame.fin << " packetNum=" << packetNum << " "
<< *this;
auto stream = conn_->streamManager->getStream(frame.streamId);
pktHasRetransmittableData = true;
if (!stream) {
VLOG(10) << "Could not find stream=" << frame.streamId << " "
<< *conn_;
break;
}
receiveReadStreamFrameSMHandler(*stream, std::move(frame));
break;
}
case QuicFrame::Type::ReadNewTokenFrame: {
ReadNewTokenFrame& newTokenFrame = *quicFrame.asReadNewTokenFrame();
std::string tokenStr =
newTokenFrame.token->moveToFbString().toStdString();
VLOG(10) << "client received new token token="
<< folly::hexlify(tokenStr);
if (newTokenCallback_) {
newTokenCallback_(std::move(tokenStr));
}
break;
}
case QuicFrame::Type::MaxDataFrame: {
MaxDataFrame& connWindowUpdate = *quicFrame.asMaxDataFrame();
VLOG(10) << "Client received max data offset="
<< connWindowUpdate.maximumData << " " << *this;
pktHasRetransmittableData = true;
handleConnWindowUpdate(*conn_, connWindowUpdate, packetNum);
break;
}
case QuicFrame::Type::MaxStreamDataFrame: {
MaxStreamDataFrame& streamWindowUpdate =
*quicFrame.asMaxStreamDataFrame();
VLOG(10) << "Client received max stream data stream="
<< streamWindowUpdate.streamId
<< " offset=" << streamWindowUpdate.maximumData << " "
<< *this;
if (isReceivingStream(conn_->nodeType, streamWindowUpdate.streamId)) {
throw QuicTransportException(
"Received MaxStreamDataFrame for receiving stream.",
TransportErrorCode::STREAM_STATE_ERROR);
}
pktHasRetransmittableData = true;
auto stream =
conn_->streamManager->getStream(streamWindowUpdate.streamId);
if (stream) {
handleStreamWindowUpdate(
*stream, streamWindowUpdate.maximumData, packetNum);
}
break;
}
case QuicFrame::Type::DataBlockedFrame: {
VLOG(10) << "Client received blocked " << *this;
pktHasRetransmittableData = true;
handleConnBlocked(*conn_);
break;
}
case QuicFrame::Type::StreamDataBlockedFrame: {
// peer wishes to send data, but is unable to due to stream-level flow
// control
StreamDataBlockedFrame& blocked = *quicFrame.asStreamDataBlockedFrame();
VLOG(10) << "Client received blocked stream=" << blocked.streamId << " "
<< *this;
pktHasRetransmittableData = true;
auto stream = conn_->streamManager->getStream(blocked.streamId);
if (stream) {
handleStreamBlocked(*stream);
}
break;
}
case QuicFrame::Type::StreamsBlockedFrame: {
// peer wishes to open a stream, but is unable to due to the maximum
// stream limit set by us
StreamsBlockedFrame& blocked = *quicFrame.asStreamsBlockedFrame();
VLOG(10) << "Client received stream blocked limit="
<< blocked.streamLimit << " " << *this;
// TODO implement handler for it
break;
}
case QuicFrame::Type::ConnectionCloseFrame: {
ConnectionCloseFrame& connFrame = *quicFrame.asConnectionCloseFrame();
auto errMsg = folly::to<std::string>(
"Client closed by peer reason=", connFrame.reasonPhrase);
VLOG(4) << errMsg << " " << *this;
// we want to deliver app callbacks with the peer supplied error,
// but send a NO_ERROR to the peer.
if (conn_->qLogger) {
conn_->qLogger->addTransportStateUpdate(getPeerClose(errMsg));
}
conn_->peerConnectionError =
QuicError(QuicErrorCode(connFrame.errorCode), std::move(errMsg));
throw QuicTransportException(
"Peer closed", TransportErrorCode::NO_ERROR);
break;
}
case QuicFrame::Type::PingFrame:
// Ping isn't retransmittable. But we would like to ack them early.
// So, make Ping frames count towards ack policy
pktHasRetransmittableData = true;
conn_->pendingEvents.notifyPingReceived = true;
break;
case QuicFrame::Type::PaddingFrame:
break;
case QuicFrame::Type::QuicSimpleFrame: {
QuicSimpleFrame& simpleFrame = *quicFrame.asQuicSimpleFrame();
pktHasRetransmittableData = true;
updateSimpleFrameOnPacketReceived(
*conn_, simpleFrame, packetNum, false);
break;
}
case QuicFrame::Type::DatagramFrame: {
DatagramFrame& frame = *quicFrame.asDatagramFrame();
VLOG(10) << "Client received datagram data: "
<< "len=" << frame.length << " " << *this;
// Datagram isn't retransmittable. But we would like to ack them early.
// So, make Datagram frames count towards ack policy
pktHasRetransmittableData = true;
handleDatagram(*conn_, frame, receiveTimePoint);
break;
}
default:
break;
}
}
auto handshakeLayer = clientConn_->clientHandshakeLayer;
if (handshakeLayer->getPhase() == ClientHandshake::Phase::Established &&
hasInitialOrHandshakeCiphers(*conn_)) {
handshakeConfirmed(*conn_);
}
// Try reading bytes off of crypto, and performing a handshake.
auto cryptoData = readDataFromCryptoStream(
*getCryptoStream(*conn_->cryptoState, encryptionLevel));
if (cryptoData) {
bool hadOneRttKey = conn_->oneRttWriteCipher != nullptr;
handshakeLayer->doHandshake(std::move(cryptoData), encryptionLevel);
bool oneRttKeyDerivationTriggered = false;
if (!hadOneRttKey && conn_->oneRttWriteCipher) {
oneRttKeyDerivationTriggered = true;
updatePacingOnKeyEstablished(*conn_);
}
if (conn_->oneRttWriteCipher && conn_->readCodec->getOneRttReadCipher()) {
clientConn_->zeroRttWriteCipher.reset();
clientConn_->zeroRttWriteHeaderCipher.reset();
}
auto zeroRttRejected = handshakeLayer->getZeroRttRejected();
if (zeroRttRejected.has_value() && *zeroRttRejected) {
if (conn_->qLogger) {
conn_->qLogger->addTransportStateUpdate(kZeroRttRejected);
}
QUIC_STATS(conn_->statsCallback, onZeroRttRejected);
handshakeLayer->removePsk(hostname_);
} else if (zeroRttRejected.has_value()) {
if (conn_->qLogger) {
conn_->qLogger->addTransportStateUpdate(kZeroRttAccepted);
}
QUIC_STATS(conn_->statsCallback, onZeroRttAccepted);
conn_->usedZeroRtt = true;
}
// We should get transport parameters if we've derived 1-rtt keys and 0-rtt
// was rejected, or we have derived 1-rtt keys and 0-rtt was never
// attempted.
if (oneRttKeyDerivationTriggered) {
auto serverParams = handshakeLayer->getServerTransportParams();
if (!serverParams) {
throw QuicTransportException(
"No server transport params",
TransportErrorCode::TRANSPORT_PARAMETER_ERROR);
}
if ((zeroRttRejected.has_value() && *zeroRttRejected) ||
!zeroRttRejected.has_value()) {
auto originalPeerMaxOffset =
conn_->flowControlState.peerAdvertisedMaxOffset;
auto originalPeerInitialStreamOffsetBidiLocal =
conn_->flowControlState
.peerAdvertisedInitialMaxStreamOffsetBidiLocal;
auto originalPeerInitialStreamOffsetBidiRemote =
conn_->flowControlState
.peerAdvertisedInitialMaxStreamOffsetBidiRemote;
auto originalPeerInitialStreamOffsetUni =
conn_->flowControlState.peerAdvertisedInitialMaxStreamOffsetUni;
VLOG(10) << "Client negotiated transport params " << *this;
auto maxStreamsBidi = getIntegerParameter(
TransportParameterId::initial_max_streams_bidi,
serverParams->parameters);
auto maxStreamsUni = getIntegerParameter(
TransportParameterId::initial_max_streams_uni,
serverParams->parameters);
processServerInitialParams(
*clientConn_, std::move(*serverParams), packetNum);
cacheServerInitialParams(
*clientConn_,
conn_->flowControlState.peerAdvertisedMaxOffset,
conn_->flowControlState
.peerAdvertisedInitialMaxStreamOffsetBidiLocal,
conn_->flowControlState
.peerAdvertisedInitialMaxStreamOffsetBidiRemote,
conn_->flowControlState.peerAdvertisedInitialMaxStreamOffsetUni,
maxStreamsBidi.value_or(0),
maxStreamsUni.value_or(0));
if (zeroRttRejected.has_value() && *zeroRttRejected) {
// verify that the new flow control parameters are >= the original
// transport parameters that were use. This is the easy case. If the
// flow control decreases then we are just screwed and we need to have
// the app retry the connection. The other parameters can be updated.
// TODO: implement undo transport state on retry.
if (originalPeerMaxOffset >
conn_->flowControlState.peerAdvertisedMaxOffset ||
originalPeerInitialStreamOffsetBidiLocal >
conn_->flowControlState
.peerAdvertisedInitialMaxStreamOffsetBidiLocal ||
originalPeerInitialStreamOffsetBidiRemote >
conn_->flowControlState
.peerAdvertisedInitialMaxStreamOffsetBidiRemote ||
originalPeerInitialStreamOffsetUni >
conn_->flowControlState
.peerAdvertisedInitialMaxStreamOffsetUni) {
throw QuicTransportException(
"Rejection of zero rtt parameters unsupported",
TransportErrorCode::TRANSPORT_PARAMETER_ERROR);
}
}
}
// TODO This sucks, but manually update the max packet size until we fix
// 0-rtt transport parameters.
if (conn_->transportSettings.canIgnorePathMTU &&
zeroRttRejected.has_value() && !*zeroRttRejected) {
auto updatedPacketSize = getIntegerParameter(
TransportParameterId::max_packet_size, serverParams->parameters);
updatedPacketSize = std::max<uint64_t>(
updatedPacketSize.value_or(kDefaultUDPSendPacketLen),
kDefaultUDPSendPacketLen);
updatedPacketSize =
std::min<uint64_t>(*updatedPacketSize, kDefaultMaxUDPPayload);
conn_->udpSendPacketLen = *updatedPacketSize;
}
// TODO this is another bandaid. Explicitly set the stateless reset token
// or else conns that use 0-RTT won't be able to parse stateless resets.
if (!clientConn_->statelessResetToken) {
clientConn_->statelessResetToken =
getStatelessResetTokenParameter(serverParams->parameters);
}
if (clientConn_->statelessResetToken) {
conn_->readCodec->setStatelessResetToken(
clientConn_->statelessResetToken.value());
}
}
if (zeroRttRejected.has_value() && *zeroRttRejected) {
// TODO: Make sure the alpn is the same, if not then do a full undo of the
// state.
clientConn_->zeroRttWriteCipher.reset();
clientConn_->zeroRttWriteHeaderCipher.reset();
markZeroRttPacketsLost(*conn_, markPacketLoss);
}
}
updateAckSendStateOnRecvPacket(
*conn_,
ackState,
outOfOrder,
pktHasRetransmittableData,
pktHasCryptoData);
if (encryptionLevel == EncryptionLevel::Handshake &&
conn_->initialWriteCipher) {
conn_->initialWriteCipher.reset();
conn_->initialHeaderCipher.reset();
conn_->readCodec->setInitialReadCipher(nullptr);
conn_->readCodec->setInitialHeaderCipher(nullptr);
implicitAckCryptoStream(*conn_, EncryptionLevel::Initial);
}
}