in quic/state/AckHandlers.cpp [50:475]
AckEvent processAckFrame(
QuicConnectionStateBase& conn,
PacketNumberSpace pnSpace,
const ReadAckFrame& frame,
const AckVisitor& ackVisitor,
const LossVisitor& lossVisitor,
const TimePoint& ackReceiveTime) {
const auto nowTime = Clock::now();
// TODO: send error if we get an ack for a packet we've not sent t18721184
auto ack = AckEvent::Builder()
.setAckTime(ackReceiveTime)
.setAdjustedAckTime(ackReceiveTime - frame.ackDelay)
.setAckDelay(frame.ackDelay)
.setPacketNumberSpace(pnSpace)
.setLargestAckedPacket(frame.largestAcked)
.setIsImplicitAck(frame.implicit)
.build();
// temporary storage to enable packets to be processed in sent order
std::deque<OutstandingPacketWithHandlerContext> packetsWithHandlerContext;
auto currentPacketIt = getLastOutstandingPacketIncludingLost(conn, pnSpace);
uint64_t dsrPacketsAcked = 0;
folly::Optional<decltype(conn.lossState.lastAckedPacketSentTime)>
lastAckedPacketSentTime;
folly::Optional<Observer::SpuriousLossEvent> spuriousLossEvent;
// Used for debug only.
const auto originalPacketCount = conn.outstandings.packetCount;
if (conn.observers->size() > 0) {
spuriousLossEvent.emplace(ackReceiveTime);
}
auto ackBlockIt = frame.ackBlocks.cbegin();
while (ackBlockIt != frame.ackBlocks.cend() &&
currentPacketIt != conn.outstandings.packets.rend()) {
// In reverse order, find the first outstanding packet that has a packet
// number LE the endPacket of the current ack range.
auto rPacketIt = std::lower_bound(
currentPacketIt,
conn.outstandings.packets.rend(),
ackBlockIt->endPacket,
[&](const auto& packetWithTime, const auto& val) {
return packetWithTime.packet.header.getPacketSequenceNum() > val;
});
if (rPacketIt == conn.outstandings.packets.rend()) {
// This means that all the packets are greater than the end packet.
// Since we iterate the ACK blocks in reverse order of end packets, our
// work here is done.
VLOG(10) << __func__ << " less than all outstanding packets outstanding="
<< conn.outstandings.numOutstanding() << " range=["
<< ackBlockIt->startPacket << ", " << ackBlockIt->endPacket
<< "]"
<< " " << conn;
ackBlockIt++;
break;
}
auto eraseEnd = rPacketIt;
while (rPacketIt != conn.outstandings.packets.rend()) {
auto currentPacketNum = rPacketIt->packet.header.getPacketSequenceNum();
auto currentPacketNumberSpace =
rPacketIt->packet.header.getPacketNumberSpace();
if (pnSpace != currentPacketNumberSpace) {
// When the next packet is not in the same packet number space, we need
// to skip it in current ack processing. If the iterator has moved, that
// means we have found packets in the current space that are acked by
// this ack block. So the code erases the current iterator range and
// move the iterator to be the next search point.
if (rPacketIt != eraseEnd) {
auto nextElem = conn.outstandings.packets.erase(
rPacketIt.base(), eraseEnd.base());
rPacketIt = std::reverse_iterator<decltype(nextElem)>(nextElem) + 1;
eraseEnd = rPacketIt;
} else {
rPacketIt++;
eraseEnd = rPacketIt;
}
continue;
}
if (currentPacketNum < ackBlockIt->startPacket) {
break;
}
VLOG(10) << __func__ << " acked packetNum=" << currentPacketNum
<< " space=" << currentPacketNumberSpace << " handshake="
<< (int)((rPacketIt->metadata.isHandshake) ? 1 : 0) << " "
<< conn;
// If we hit a packet which has been lost we need to count the spurious
// loss and ignore all other processing.
// TODO also remove any stream data from the loss buffer.
if (rPacketIt->declaredLost) {
CHECK_GT(conn.outstandings.declaredLostCount, 0);
conn.lossState.totalPacketsSpuriouslyMarkedLost++;
if (conn.transportSettings.useAdaptiveLossThresholds) {
if (rPacketIt->lossReorderDistance.hasValue() &&
rPacketIt->lossReorderDistance.value() >
conn.lossState.reorderingThreshold) {
conn.lossState.reorderingThreshold =
rPacketIt->lossReorderDistance.value();
}
if (rPacketIt->lossTimeoutDividend.hasValue() &&
rPacketIt->lossTimeoutDividend.value() >
conn.transportSettings.timeReorderingThreshDividend) {
conn.transportSettings.timeReorderingThreshDividend =
rPacketIt->lossTimeoutDividend.value();
}
}
QUIC_STATS(conn.statsCallback, onPacketSpuriousLoss);
// Decrement the counter, trust that we will erase this as part of
// the bulk erase.
conn.outstandings.declaredLostCount--;
if (spuriousLossEvent) {
spuriousLossEvent->addSpuriousPacket(*rPacketIt);
}
rPacketIt++;
continue;
}
bool needsProcess = !rPacketIt->associatedEvent ||
conn.outstandings.packetEvents.count(*rPacketIt->associatedEvent);
if (needsProcess) {
CHECK(conn.outstandings.packetCount[currentPacketNumberSpace]);
--conn.outstandings.packetCount[currentPacketNumberSpace];
}
ack.ackedBytes += rPacketIt->metadata.encodedSize;
if (rPacketIt->associatedEvent) {
CHECK(conn.outstandings.clonedPacketCount[currentPacketNumberSpace]);
--conn.outstandings.clonedPacketCount[currentPacketNumberSpace];
}
if (rPacketIt->isDSRPacket) {
++dsrPacketsAcked;
}
// Update RTT if current packet is the largestAcked in the frame
//
// An RTT sample is generated using only the largest acknowledged packet
// in the received ACK frame. To avoid generating multiple RTT samples
// for a single packet, an ACK frame SHOULD NOT be used to update RTT
// estimates if it does not newly acknowledge the largest acknowledged
// packet (RFC9002). This includes for minRTT estimates.
if (!ack.implicit && currentPacketNum == frame.largestAcked) {
auto ackReceiveTimeOrNow = ackReceiveTime > rPacketIt->metadata.time
? ackReceiveTime
: nowTime;
// Use ceil to round up to next microsecond during conversion.
//
// While unlikely, it's still technically possible for the RTT to be
// zero; ignore if this is the case.
auto rttSample = std::chrono::ceil<std::chrono::microseconds>(
ackReceiveTimeOrNow - rPacketIt->metadata.time);
if (rttSample != rttSample.zero()) {
// notify observers
const Observer::PacketRTT packetRTT(
ackReceiveTimeOrNow, rttSample, frame.ackDelay, *rPacketIt);
for (const auto& observer : *(conn.observers)) {
conn.pendingCallbacks.emplace_back(
[observer, packetRTT](QuicSocket* qSocket) {
if (observer->getConfig().rttSamples) {
observer->rttSampleGenerated(qSocket, packetRTT);
}
});
}
// update AckEvent RTTs, which are used by CCA and other processing
CHECK(!ack.rttSample.has_value());
CHECK(!ack.rttSampleNoAckDelay.has_value());
ack.rttSample = rttSample;
ack.rttSampleNoAckDelay = (rttSample >= frame.ackDelay)
? folly::make_optional(
std::chrono::ceil<std::chrono::microseconds>(
rttSample - frame.ackDelay))
: folly::none;
// update transport RTT
updateRtt(conn, rttSample, frame.ackDelay);
} // if (rttSample != rttSample.zero())
} // if (!ack.implicit && currentPacketNum == frame.largestAcked)
// D6D probe acked. Only if it's for the last probe do we
// trigger state change
if (rPacketIt->metadata.isD6DProbe) {
CHECK(conn.d6d.lastProbe);
if (!rPacketIt->declaredLost) {
++conn.d6d.meta.totalAckedProbes;
if (currentPacketNum == conn.d6d.lastProbe->packetNum) {
onD6DLastProbeAcked(conn);
}
}
}
// Remove this PacketEvent from the outstandings.packetEvents set
if (rPacketIt->associatedEvent) {
conn.outstandings.packetEvents.erase(*rPacketIt->associatedEvent);
}
if (!ack.largestNewlyAckedPacket ||
*ack.largestNewlyAckedPacket < currentPacketNum) {
ack.largestNewlyAckedPacket = currentPacketNum;
ack.largestNewlyAckedPacketSentTime = rPacketIt->metadata.time;
ack.largestNewlyAckedPacketAppLimited = rPacketIt->isAppLimited;
}
if (!ack.implicit) {
conn.lossState.totalBytesAcked += rPacketIt->metadata.encodedSize;
conn.lossState.totalBytesSentAtLastAck = conn.lossState.totalBytesSent;
conn.lossState.totalBytesAckedAtLastAck =
conn.lossState.totalBytesAcked;
conn.lossState.totalBodyBytesAcked +=
rPacketIt->metadata.encodedBodySize;
if (!lastAckedPacketSentTime) {
lastAckedPacketSentTime = rPacketIt->metadata.time;
}
conn.lossState.lastAckedTime = ackReceiveTime;
conn.lossState.adjustedLastAckedTime = ackReceiveTime - frame.ackDelay;
}
// temporarily store the packet to facilitate in-order ACK processing
{
auto tmpIt = packetsWithHandlerContext.emplace(
std::find_if(
packetsWithHandlerContext.begin(),
packetsWithHandlerContext.end(),
[¤tPacketNum](const auto& packetWithHandlerContext) {
return packetWithHandlerContext.outstandingPacket.packet
.header.getPacketSequenceNum() > currentPacketNum;
}),
std::move(*rPacketIt));
tmpIt->processAllFrames = needsProcess;
}
rPacketIt++;
}
// Done searching for acked outstanding packets in current ack block. Erase
// the current iterator range which is the last batch of continuous
// outstanding packets that are in this ack block. Move the iterator to be
// the next search point.
if (rPacketIt != eraseEnd) {
auto nextElem =
conn.outstandings.packets.erase(rPacketIt.base(), eraseEnd.base());
currentPacketIt = std::reverse_iterator<decltype(nextElem)>(nextElem);
} else {
currentPacketIt = rPacketIt;
}
ackBlockIt++;
}
// Invoke AckVisitor for WriteAckFrames all the time. Invoke it for other
// frame types only if the packet doesn't have an associated PacketEvent;
// or the PacketEvent is in conn.outstandings.packetEvents
ack.ackedPackets.reserve(packetsWithHandlerContext.size());
for (auto& packetWithHandlerContext : packetsWithHandlerContext) {
auto& outstandingPacket = packetWithHandlerContext.outstandingPacket;
const auto processAllFrames = packetWithHandlerContext.processAllFrames;
AckEvent::AckPacket::DetailsPerStream detailsPerStream;
for (auto& packetFrame : outstandingPacket.packet.frames) {
if (!processAllFrames &&
packetFrame.type() != QuicWriteFrame::Type::WriteAckFrame) {
continue; // skip processing this frame
}
// We do a few things here for ACKs of WriteStreamFrames:
// 1. To understand whether the ACK of this frame changes the
// stream's delivery offset, we record the delivery offset before
// running the ackVisitor, run it, and then check if the stream's
// delivery offset changed.
//
// 2. To understand whether the ACK of this frame is redundant (e.g.
// the frame was already ACKed before), we record the version of
// the stream's ACK IntervalSet before running the ackVisitor,
// run it, and then check if the version changed. If it changed,
// we know that _this_ ACK of _this_ frame had an impact.
//
// 3. If we determine that the ACK of the frame is not-redundant,
// and the frame was retransmitted, we record the number of bytes
// ACKed by a retransmit as well.
// Part 1: Record delivery offset prior to running ackVisitor.
struct PreAckVisitorState {
const uint64_t ackIntervalSetVersion;
const folly::Optional<uint64_t> maybeLargestDeliverableOffset;
};
const auto maybePreAckVisitorState =
[&conn](
const auto& packetFrame) -> folly::Optional<PreAckVisitorState> {
// check if it's a WriteStreamFrame being ACKed
if (packetFrame.type() != QuicWriteFrame::Type::WriteStreamFrame) {
return folly::none;
}
// check if the stream is alive (could be ACK for dead stream)
const WriteStreamFrame& ackedFrame = *packetFrame.asWriteStreamFrame();
if (!conn.streamManager->streamExists(ackedFrame.streamId)) {
return folly::none;
}
auto ackedStream =
CHECK_NOTNULL(conn.streamManager->getStream(ackedFrame.streamId));
// stream is alive and frame is WriteStreamFrame
return PreAckVisitorState{
getAckIntervalSetVersion(*ackedStream),
getLargestDeliverableOffset(*ackedStream)};
}(packetFrame);
// run the ACK visitor
ackVisitor(outstandingPacket, packetFrame, frame);
// Part 2 and 3: Process current state relative to the PreAckVistorState.
if (maybePreAckVisitorState.has_value()) {
const auto& preAckVisitorState = maybePreAckVisitorState.value();
const WriteStreamFrame& ackedFrame = *packetFrame.asWriteStreamFrame();
auto ackedStream =
CHECK_NOTNULL(conn.streamManager->getStream(ackedFrame.streamId));
// determine if this frame was a retransmission
const bool retransmission = ([&outstandingPacket, &ackedFrame]() {
// in some cases (some unit tests), stream details are not available
// in these cases, we assume it is not a retransmission
if (const auto maybeStreamDetails = folly::get_optional(
outstandingPacket.metadata.detailsPerStream,
ackedFrame.streamId)) {
const auto& maybeFirstNewStreamByteOffset =
maybeStreamDetails->maybeFirstNewStreamByteOffset;
return (
!maybeFirstNewStreamByteOffset.has_value() ||
maybeFirstNewStreamByteOffset.value() > ackedFrame.offset);
}
return false; // assume not a retransmission
})();
// check for change in ACK IntervalSet version
if (preAckVisitorState.ackIntervalSetVersion !=
getAckIntervalSetVersion(*ackedStream)) {
// we were able to fill in a hole in the ACK interval
detailsPerStream.recordFrameDelivered(ackedFrame, retransmission);
// check for change in delivery offset
const auto maybeLargestDeliverableOffset =
getLargestDeliverableOffset(*ackedStream);
if (preAckVisitorState.maybeLargestDeliverableOffset !=
maybeLargestDeliverableOffset) {
CHECK(maybeLargestDeliverableOffset.has_value());
detailsPerStream.recordDeliveryOffsetUpdate(
ackedFrame.streamId, maybeLargestDeliverableOffset.value());
}
} else {
// we got an ACK of a frame that was already marked as delivered
// when handling the ACK of some earlier packet; mark as such
detailsPerStream.recordFrameAlreadyDelivered(
ackedFrame, retransmission);
// should be no change in delivery offset
DCHECK(
preAckVisitorState.maybeLargestDeliverableOffset ==
getLargestDeliverableOffset(*CHECK_NOTNULL(ackedStream)));
}
}
}
ack.ackedPackets.emplace_back(
CongestionController::AckEvent::AckPacket::Builder()
.setPacketNum(
outstandingPacket.packet.header.getPacketSequenceNum())
.setOutstandingPacketMetadata(std::move(outstandingPacket.metadata))
.setDetailsPerStream(std::move(detailsPerStream))
.setLastAckedPacketInfo(
std::move(outstandingPacket.lastAckedPacketInfo))
.setAppLimited(outstandingPacket.isAppLimited)
.build());
}
if (lastAckedPacketSentTime) {
conn.lossState.lastAckedPacketSentTime = *lastAckedPacketSentTime;
}
CHECK_GE(conn.outstandings.dsrCount, dsrPacketsAcked);
conn.outstandings.dsrCount -= dsrPacketsAcked;
CHECK_GE(
conn.outstandings.packets.size(), conn.outstandings.declaredLostCount);
auto updatedOustandingPacketsCount = conn.outstandings.numOutstanding();
const auto& packetCount = conn.outstandings.packetCount;
LOG_IF(
DFATAL,
updatedOustandingPacketsCount <
packetCount[PacketNumberSpace::Handshake] +
packetCount[PacketNumberSpace::Initial] +
packetCount[PacketNumberSpace::AppData])
<< "QUIC packetCount inconsistency: "
"numOutstanding: "
<< updatedOustandingPacketsCount << " packetCount: {"
<< packetCount[PacketNumberSpace::Initial] << ","
<< packetCount[PacketNumberSpace::Handshake] << ","
<< packetCount[PacketNumberSpace::AppData] << "}"
<< " originalPacketCount: {"
<< originalPacketCount[PacketNumberSpace::Initial] << ","
<< originalPacketCount[PacketNumberSpace::Handshake] << ","
<< originalPacketCount[PacketNumberSpace::AppData] << "}";
CHECK_GE(updatedOustandingPacketsCount, conn.outstandings.numClonedPackets());
auto lossEvent = handleAckForLoss(conn, lossVisitor, ack, pnSpace);
if (conn.congestionController &&
(ack.largestNewlyAckedPacket.has_value() || lossEvent)) {
if (lossEvent) {
CHECK(lossEvent->largestLostSentTime && lossEvent->smallestLostSentTime);
// TODO it's not clear that we should be using the smallest and largest
// lost times here. It may perhaps be better to only consider the latest
// contiguous lost block and determine if that block is larger than the
// congestion period. Alternatively we could consider every lost block
// and check if any of them constitute persistent congestion.
lossEvent->persistentCongestion = isPersistentCongestion(
conn.lossState.srtt == 0s ? folly::none
: folly::Optional(calculatePTO(conn)),
*lossEvent->smallestLostSentTime,
*lossEvent->largestLostSentTime,
ack);
if (lossEvent->persistentCongestion) {
QUIC_STATS(conn.statsCallback, onPersistentCongestion);
}
}
conn.congestionController->onPacketAckOrLoss(&ack, lossEvent.get_pointer());
ack.ccState = conn.congestionController->getState();
}
clearOldOutstandingPackets(conn, ackReceiveTime, pnSpace);
if (spuriousLossEvent && spuriousLossEvent->hasPackets()) {
for (const auto& observer : *(conn.observers)) {
conn.pendingCallbacks.emplace_back(
[observer, spuriousLossEvent](QuicSocket* qSocket) {
if (observer->getConfig().spuriousLossEvents) {
observer->spuriousLossDetected(qSocket, *spuriousLossEvent);
}
});
}
}
return ack;
}