AckEvent processAckFrame()

in quic/state/AckHandlers.cpp [50:475]


AckEvent processAckFrame(
    QuicConnectionStateBase& conn,
    PacketNumberSpace pnSpace,
    const ReadAckFrame& frame,
    const AckVisitor& ackVisitor,
    const LossVisitor& lossVisitor,
    const TimePoint& ackReceiveTime) {
  const auto nowTime = Clock::now();

  // TODO: send error if we get an ack for a packet we've not sent t18721184
  auto ack = AckEvent::Builder()
                 .setAckTime(ackReceiveTime)
                 .setAdjustedAckTime(ackReceiveTime - frame.ackDelay)
                 .setAckDelay(frame.ackDelay)
                 .setPacketNumberSpace(pnSpace)
                 .setLargestAckedPacket(frame.largestAcked)
                 .setIsImplicitAck(frame.implicit)
                 .build();

  // temporary storage to enable packets to be processed in sent order
  std::deque<OutstandingPacketWithHandlerContext> packetsWithHandlerContext;

  auto currentPacketIt = getLastOutstandingPacketIncludingLost(conn, pnSpace);
  uint64_t dsrPacketsAcked = 0;
  folly::Optional<decltype(conn.lossState.lastAckedPacketSentTime)>
      lastAckedPacketSentTime;
  folly::Optional<Observer::SpuriousLossEvent> spuriousLossEvent;
  // Used for debug only.
  const auto originalPacketCount = conn.outstandings.packetCount;
  if (conn.observers->size() > 0) {
    spuriousLossEvent.emplace(ackReceiveTime);
  }
  auto ackBlockIt = frame.ackBlocks.cbegin();
  while (ackBlockIt != frame.ackBlocks.cend() &&
         currentPacketIt != conn.outstandings.packets.rend()) {
    // In reverse order, find the first outstanding packet that has a packet
    // number LE the endPacket of the current ack range.
    auto rPacketIt = std::lower_bound(
        currentPacketIt,
        conn.outstandings.packets.rend(),
        ackBlockIt->endPacket,
        [&](const auto& packetWithTime, const auto& val) {
          return packetWithTime.packet.header.getPacketSequenceNum() > val;
        });
    if (rPacketIt == conn.outstandings.packets.rend()) {
      // This means that all the packets are greater than the end packet.
      // Since we iterate the ACK blocks in reverse order of end packets, our
      // work here is done.
      VLOG(10) << __func__ << " less than all outstanding packets outstanding="
               << conn.outstandings.numOutstanding() << " range=["
               << ackBlockIt->startPacket << ", " << ackBlockIt->endPacket
               << "]"
               << " " << conn;
      ackBlockIt++;
      break;
    }

    auto eraseEnd = rPacketIt;
    while (rPacketIt != conn.outstandings.packets.rend()) {
      auto currentPacketNum = rPacketIt->packet.header.getPacketSequenceNum();
      auto currentPacketNumberSpace =
          rPacketIt->packet.header.getPacketNumberSpace();
      if (pnSpace != currentPacketNumberSpace) {
        // When the next packet is not in the same packet number space, we need
        // to skip it in current ack processing. If the iterator has moved, that
        // means we have found packets in the current space that are acked by
        // this ack block. So the code erases the current iterator range and
        // move the iterator to be the next search point.
        if (rPacketIt != eraseEnd) {
          auto nextElem = conn.outstandings.packets.erase(
              rPacketIt.base(), eraseEnd.base());
          rPacketIt = std::reverse_iterator<decltype(nextElem)>(nextElem) + 1;
          eraseEnd = rPacketIt;
        } else {
          rPacketIt++;
          eraseEnd = rPacketIt;
        }
        continue;
      }
      if (currentPacketNum < ackBlockIt->startPacket) {
        break;
      }
      VLOG(10) << __func__ << " acked packetNum=" << currentPacketNum
               << " space=" << currentPacketNumberSpace << " handshake="
               << (int)((rPacketIt->metadata.isHandshake) ? 1 : 0) << " "
               << conn;
      // If we hit a packet which has been lost we need to count the spurious
      // loss and ignore all other processing.
      // TODO also remove any stream data from the loss buffer.
      if (rPacketIt->declaredLost) {
        CHECK_GT(conn.outstandings.declaredLostCount, 0);
        conn.lossState.totalPacketsSpuriouslyMarkedLost++;
        if (conn.transportSettings.useAdaptiveLossThresholds) {
          if (rPacketIt->lossReorderDistance.hasValue() &&
              rPacketIt->lossReorderDistance.value() >
                  conn.lossState.reorderingThreshold) {
            conn.lossState.reorderingThreshold =
                rPacketIt->lossReorderDistance.value();
          }
          if (rPacketIt->lossTimeoutDividend.hasValue() &&
              rPacketIt->lossTimeoutDividend.value() >
                  conn.transportSettings.timeReorderingThreshDividend) {
            conn.transportSettings.timeReorderingThreshDividend =
                rPacketIt->lossTimeoutDividend.value();
          }
        }
        QUIC_STATS(conn.statsCallback, onPacketSpuriousLoss);
        // Decrement the counter, trust that we will erase this as part of
        // the bulk erase.
        conn.outstandings.declaredLostCount--;
        if (spuriousLossEvent) {
          spuriousLossEvent->addSpuriousPacket(*rPacketIt);
        }
        rPacketIt++;
        continue;
      }
      bool needsProcess = !rPacketIt->associatedEvent ||
          conn.outstandings.packetEvents.count(*rPacketIt->associatedEvent);
      if (needsProcess) {
        CHECK(conn.outstandings.packetCount[currentPacketNumberSpace]);
        --conn.outstandings.packetCount[currentPacketNumberSpace];
      }
      ack.ackedBytes += rPacketIt->metadata.encodedSize;
      if (rPacketIt->associatedEvent) {
        CHECK(conn.outstandings.clonedPacketCount[currentPacketNumberSpace]);
        --conn.outstandings.clonedPacketCount[currentPacketNumberSpace];
      }
      if (rPacketIt->isDSRPacket) {
        ++dsrPacketsAcked;
      }

      // Update RTT if current packet is the largestAcked in the frame
      //
      // An RTT sample is generated using only the largest acknowledged packet
      // in the received ACK frame. To avoid generating multiple RTT samples
      // for a single packet, an ACK frame SHOULD NOT be used to update RTT
      // estimates if it does not newly acknowledge the largest acknowledged
      // packet (RFC9002). This includes for minRTT estimates.
      if (!ack.implicit && currentPacketNum == frame.largestAcked) {
        auto ackReceiveTimeOrNow = ackReceiveTime > rPacketIt->metadata.time
            ? ackReceiveTime
            : nowTime;

        // Use ceil to round up to next microsecond during conversion.
        //
        // While unlikely, it's still technically possible for the RTT to be
        // zero; ignore if this is the case.
        auto rttSample = std::chrono::ceil<std::chrono::microseconds>(
            ackReceiveTimeOrNow - rPacketIt->metadata.time);
        if (rttSample != rttSample.zero()) {
          // notify observers
          const Observer::PacketRTT packetRTT(
              ackReceiveTimeOrNow, rttSample, frame.ackDelay, *rPacketIt);
          for (const auto& observer : *(conn.observers)) {
            conn.pendingCallbacks.emplace_back(
                [observer, packetRTT](QuicSocket* qSocket) {
                  if (observer->getConfig().rttSamples) {
                    observer->rttSampleGenerated(qSocket, packetRTT);
                  }
                });
          }

          // update AckEvent RTTs, which are used by CCA and other processing
          CHECK(!ack.rttSample.has_value());
          CHECK(!ack.rttSampleNoAckDelay.has_value());
          ack.rttSample = rttSample;
          ack.rttSampleNoAckDelay = (rttSample >= frame.ackDelay)
              ? folly::make_optional(
                    std::chrono::ceil<std::chrono::microseconds>(
                        rttSample - frame.ackDelay))
              : folly::none;

          // update transport RTT
          updateRtt(conn, rttSample, frame.ackDelay);
        } // if (rttSample != rttSample.zero())
      } // if (!ack.implicit && currentPacketNum == frame.largestAcked)

      // D6D probe acked. Only if it's for the last probe do we
      // trigger state change
      if (rPacketIt->metadata.isD6DProbe) {
        CHECK(conn.d6d.lastProbe);
        if (!rPacketIt->declaredLost) {
          ++conn.d6d.meta.totalAckedProbes;
          if (currentPacketNum == conn.d6d.lastProbe->packetNum) {
            onD6DLastProbeAcked(conn);
          }
        }
      }
      // Remove this PacketEvent from the outstandings.packetEvents set
      if (rPacketIt->associatedEvent) {
        conn.outstandings.packetEvents.erase(*rPacketIt->associatedEvent);
      }
      if (!ack.largestNewlyAckedPacket ||
          *ack.largestNewlyAckedPacket < currentPacketNum) {
        ack.largestNewlyAckedPacket = currentPacketNum;
        ack.largestNewlyAckedPacketSentTime = rPacketIt->metadata.time;
        ack.largestNewlyAckedPacketAppLimited = rPacketIt->isAppLimited;
      }
      if (!ack.implicit) {
        conn.lossState.totalBytesAcked += rPacketIt->metadata.encodedSize;
        conn.lossState.totalBytesSentAtLastAck = conn.lossState.totalBytesSent;
        conn.lossState.totalBytesAckedAtLastAck =
            conn.lossState.totalBytesAcked;
        conn.lossState.totalBodyBytesAcked +=
            rPacketIt->metadata.encodedBodySize;
        if (!lastAckedPacketSentTime) {
          lastAckedPacketSentTime = rPacketIt->metadata.time;
        }
        conn.lossState.lastAckedTime = ackReceiveTime;
        conn.lossState.adjustedLastAckedTime = ackReceiveTime - frame.ackDelay;
      }

      // temporarily store the packet to facilitate in-order ACK processing
      {
        auto tmpIt = packetsWithHandlerContext.emplace(
            std::find_if(
                packetsWithHandlerContext.begin(),
                packetsWithHandlerContext.end(),
                [&currentPacketNum](const auto& packetWithHandlerContext) {
                  return packetWithHandlerContext.outstandingPacket.packet
                             .header.getPacketSequenceNum() > currentPacketNum;
                }),
            std::move(*rPacketIt));
        tmpIt->processAllFrames = needsProcess;
      }

      rPacketIt++;
    }
    // Done searching for acked outstanding packets in current ack block. Erase
    // the current iterator range which is the last batch of continuous
    // outstanding packets that are in this ack block. Move the iterator to be
    // the next search point.
    if (rPacketIt != eraseEnd) {
      auto nextElem =
          conn.outstandings.packets.erase(rPacketIt.base(), eraseEnd.base());
      currentPacketIt = std::reverse_iterator<decltype(nextElem)>(nextElem);
    } else {
      currentPacketIt = rPacketIt;
    }
    ackBlockIt++;
  }

  // Invoke AckVisitor for WriteAckFrames all the time. Invoke it for other
  // frame types only if the packet doesn't have an associated PacketEvent;
  // or the PacketEvent is in conn.outstandings.packetEvents
  ack.ackedPackets.reserve(packetsWithHandlerContext.size());
  for (auto& packetWithHandlerContext : packetsWithHandlerContext) {
    auto& outstandingPacket = packetWithHandlerContext.outstandingPacket;
    const auto processAllFrames = packetWithHandlerContext.processAllFrames;
    AckEvent::AckPacket::DetailsPerStream detailsPerStream;
    for (auto& packetFrame : outstandingPacket.packet.frames) {
      if (!processAllFrames &&
          packetFrame.type() != QuicWriteFrame::Type::WriteAckFrame) {
        continue; // skip processing this frame
      }

      // We do a few things here for ACKs of WriteStreamFrames:
      //  1. To understand whether the ACK of this frame changes the
      //     stream's delivery offset, we record the delivery offset before
      //     running the ackVisitor, run it, and then check if the stream's
      //     delivery offset changed.
      //
      //  2. To understand whether the ACK of this frame is redundant (e.g.
      //     the frame was already ACKed before), we record the version of
      //     the stream's ACK IntervalSet before running the ackVisitor,
      //     run it, and then check if the version changed. If it changed,
      //     we know that _this_ ACK of _this_ frame had an impact.
      //
      //  3. If we determine that the ACK of the frame is not-redundant,
      //     and the frame was retransmitted, we record the number of bytes
      //     ACKed by a retransmit as well.

      // Part 1: Record delivery offset prior to running ackVisitor.
      struct PreAckVisitorState {
        const uint64_t ackIntervalSetVersion;
        const folly::Optional<uint64_t> maybeLargestDeliverableOffset;
      };
      const auto maybePreAckVisitorState =
          [&conn](
              const auto& packetFrame) -> folly::Optional<PreAckVisitorState> {
        // check if it's a WriteStreamFrame being ACKed
        if (packetFrame.type() != QuicWriteFrame::Type::WriteStreamFrame) {
          return folly::none;
        }

        // check if the stream is alive (could be ACK for dead stream)
        const WriteStreamFrame& ackedFrame = *packetFrame.asWriteStreamFrame();
        if (!conn.streamManager->streamExists(ackedFrame.streamId)) {
          return folly::none;
        }
        auto ackedStream =
            CHECK_NOTNULL(conn.streamManager->getStream(ackedFrame.streamId));

        // stream is alive and frame is WriteStreamFrame
        return PreAckVisitorState{
            getAckIntervalSetVersion(*ackedStream),
            getLargestDeliverableOffset(*ackedStream)};
      }(packetFrame);

      // run the ACK visitor
      ackVisitor(outstandingPacket, packetFrame, frame);

      // Part 2 and 3: Process current state relative to the PreAckVistorState.
      if (maybePreAckVisitorState.has_value()) {
        const auto& preAckVisitorState = maybePreAckVisitorState.value();
        const WriteStreamFrame& ackedFrame = *packetFrame.asWriteStreamFrame();
        auto ackedStream =
            CHECK_NOTNULL(conn.streamManager->getStream(ackedFrame.streamId));

        // determine if this frame was a retransmission
        const bool retransmission = ([&outstandingPacket, &ackedFrame]() {
          // in some cases (some unit tests), stream details are not available
          // in these cases, we assume it is not a retransmission
          if (const auto maybeStreamDetails = folly::get_optional(
                  outstandingPacket.metadata.detailsPerStream,
                  ackedFrame.streamId)) {
            const auto& maybeFirstNewStreamByteOffset =
                maybeStreamDetails->maybeFirstNewStreamByteOffset;
            return (
                !maybeFirstNewStreamByteOffset.has_value() ||
                maybeFirstNewStreamByteOffset.value() > ackedFrame.offset);
          }
          return false; // assume not a retransmission
        })();

        // check for change in ACK IntervalSet version
        if (preAckVisitorState.ackIntervalSetVersion !=
            getAckIntervalSetVersion(*ackedStream)) {
          // we were able to fill in a hole in the ACK interval
          detailsPerStream.recordFrameDelivered(ackedFrame, retransmission);

          // check for change in delivery offset
          const auto maybeLargestDeliverableOffset =
              getLargestDeliverableOffset(*ackedStream);
          if (preAckVisitorState.maybeLargestDeliverableOffset !=
              maybeLargestDeliverableOffset) {
            CHECK(maybeLargestDeliverableOffset.has_value());
            detailsPerStream.recordDeliveryOffsetUpdate(
                ackedFrame.streamId, maybeLargestDeliverableOffset.value());
          }
        } else {
          // we got an ACK of a frame that was already marked as delivered
          // when handling the ACK of some earlier packet; mark as such
          detailsPerStream.recordFrameAlreadyDelivered(
              ackedFrame, retransmission);

          // should be no change in delivery offset
          DCHECK(
              preAckVisitorState.maybeLargestDeliverableOffset ==
              getLargestDeliverableOffset(*CHECK_NOTNULL(ackedStream)));
        }
      }
    }
    ack.ackedPackets.emplace_back(
        CongestionController::AckEvent::AckPacket::Builder()
            .setPacketNum(
                outstandingPacket.packet.header.getPacketSequenceNum())
            .setOutstandingPacketMetadata(std::move(outstandingPacket.metadata))
            .setDetailsPerStream(std::move(detailsPerStream))
            .setLastAckedPacketInfo(
                std::move(outstandingPacket.lastAckedPacketInfo))
            .setAppLimited(outstandingPacket.isAppLimited)
            .build());
  }
  if (lastAckedPacketSentTime) {
    conn.lossState.lastAckedPacketSentTime = *lastAckedPacketSentTime;
  }
  CHECK_GE(conn.outstandings.dsrCount, dsrPacketsAcked);
  conn.outstandings.dsrCount -= dsrPacketsAcked;
  CHECK_GE(
      conn.outstandings.packets.size(), conn.outstandings.declaredLostCount);
  auto updatedOustandingPacketsCount = conn.outstandings.numOutstanding();
  const auto& packetCount = conn.outstandings.packetCount;
  LOG_IF(
      DFATAL,
      updatedOustandingPacketsCount <
          packetCount[PacketNumberSpace::Handshake] +
              packetCount[PacketNumberSpace::Initial] +
              packetCount[PacketNumberSpace::AppData])
      << "QUIC packetCount inconsistency: "
         "numOutstanding: "
      << updatedOustandingPacketsCount << " packetCount: {"
      << packetCount[PacketNumberSpace::Initial] << ","
      << packetCount[PacketNumberSpace::Handshake] << ","
      << packetCount[PacketNumberSpace::AppData] << "}"
      << " originalPacketCount: {"
      << originalPacketCount[PacketNumberSpace::Initial] << ","
      << originalPacketCount[PacketNumberSpace::Handshake] << ","
      << originalPacketCount[PacketNumberSpace::AppData] << "}";
  CHECK_GE(updatedOustandingPacketsCount, conn.outstandings.numClonedPackets());
  auto lossEvent = handleAckForLoss(conn, lossVisitor, ack, pnSpace);
  if (conn.congestionController &&
      (ack.largestNewlyAckedPacket.has_value() || lossEvent)) {
    if (lossEvent) {
      CHECK(lossEvent->largestLostSentTime && lossEvent->smallestLostSentTime);
      // TODO it's not clear that we should be using the smallest and largest
      // lost times here. It may perhaps be better to only consider the latest
      // contiguous lost block and determine if that block is larger than the
      // congestion period. Alternatively we could consider every lost block
      // and check if any of them constitute persistent congestion.
      lossEvent->persistentCongestion = isPersistentCongestion(
          conn.lossState.srtt == 0s ? folly::none
                                    : folly::Optional(calculatePTO(conn)),
          *lossEvent->smallestLostSentTime,
          *lossEvent->largestLostSentTime,
          ack);
      if (lossEvent->persistentCongestion) {
        QUIC_STATS(conn.statsCallback, onPersistentCongestion);
      }
    }
    conn.congestionController->onPacketAckOrLoss(&ack, lossEvent.get_pointer());
    ack.ccState = conn.congestionController->getState();
  }
  clearOldOutstandingPackets(conn, ackReceiveTime, pnSpace);
  if (spuriousLossEvent && spuriousLossEvent->hasPackets()) {
    for (const auto& observer : *(conn.observers)) {
      conn.pendingCallbacks.emplace_back(
          [observer, spuriousLossEvent](QuicSocket* qSocket) {
            if (observer->getConfig().spuriousLossEvents) {
              observer->spuriousLossDetected(qSocket, *spuriousLossEvent);
            }
          });
    }
  }
  return ack;
}