quic/congestion_control/Copa.cpp (290 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #include <quic/congestion_control/Copa.h> #include <quic/common/TimeUtil.h> #include <quic/congestion_control/CongestionControlFunctions.h> #include <quic/logging/QLoggerConstants.h> namespace quic { using namespace std::chrono; Copa::Copa(QuicConnectionStateBase& conn) : conn_(conn), cwndBytes_(conn.transportSettings.initCwndInMss * conn.udpSendPacketLen), isSlowStart_(true), minRTTFilter_(kMinRTTWindowLength.count(), 0us, 0), standingRTTFilter_( 100000, /*100ms*/ 0us, 0) { VLOG(10) << __func__ << " writable=" << getWritableBytes() << " cwnd=" << cwndBytes_ << " inflight=" << conn_.lossState.inflightBytes << " " << conn_; if (conn_.transportSettings.copaDeltaParam.has_value()) { deltaParam_ = conn_.transportSettings.copaDeltaParam.value(); } useRttStanding_ = conn_.transportSettings.copaUseRttStanding; } void Copa::onRemoveBytesFromInflight(uint64_t bytes) { subtractAndCheckUnderflow(conn_.lossState.inflightBytes, bytes); VLOG(10) << __func__ << " writable=" << getWritableBytes() << " cwnd=" << cwndBytes_ << " inflight=" << conn_.lossState.inflightBytes << " " << conn_; if (conn_.qLogger) { conn_.qLogger->addCongestionMetricUpdate( conn_.lossState.inflightBytes, getCongestionWindow(), kRemoveInflight); } } void Copa::onPacketSent(const OutstandingPacket& packet) { addAndCheckOverflow( conn_.lossState.inflightBytes, packet.metadata.encodedSize); VLOG(10) << __func__ << " writable=" << getWritableBytes() << " cwnd=" << cwndBytes_ << " inflight=" << conn_.lossState.inflightBytes << " bytesBufferred=" << conn_.flowControlState.sumCurStreamBufferLen << " packetNum=" << packet.packet.header.getPacketSequenceNum() << " " << conn_; if (conn_.qLogger) { conn_.qLogger->addCongestionMetricUpdate( conn_.lossState.inflightBytes, getCongestionWindow(), kCongestionPacketSent); } } /** * Once per window, the sender * compares the current cwnd to the cwnd value at * the time that the latest acknowledged packet was * sent (i.e., cwnd at the start of the current window). * If the current cwnd is larger, then set direction to * 'up'; if it is smaller, then set direction to 'down'. * Now, if direction is the same as in the previous * window, then double v. If not, then reset v to 1. * However, start doubling v only after the direction * has remained the same for three RTTs */ void Copa::checkAndUpdateDirection(const TimePoint ackTime) { if (!velocityState_.lastCwndRecordTime.has_value()) { velocityState_.lastCwndRecordTime = ackTime; velocityState_.lastRecordedCwndBytes = cwndBytes_; return; } auto elapsed_time = ackTime - velocityState_.lastCwndRecordTime.value(); VLOG(10) << __func__ << " elapsed time for direction update " << elapsed_time.count() << ", srtt " << conn_.lossState.srtt.count() << " " << conn_; if (elapsed_time >= conn_.lossState.srtt) { auto newDirection = cwndBytes_ > velocityState_.lastRecordedCwndBytes ? VelocityState::Direction::Up : VelocityState::Direction::Down; if (newDirection != velocityState_.direction) { // if direction changes, change velocity to 1 velocityState_.velocity = 1; velocityState_.numTimesDirectionSame = 0; } else { velocityState_.numTimesDirectionSame++; uint64_t velocityDirectionThreshold = 3; if (useRttStanding_) { velocityDirectionThreshold = 4; } if (velocityState_.numTimesDirectionSame >= velocityDirectionThreshold) { velocityState_.velocity = 2 * velocityState_.velocity; } } VLOG(10) << __func__ << " updated direction from " << velocityState_.direction << " to " << newDirection << " velocityState_.numTimesDirectionSame " << velocityState_.numTimesDirectionSame << " velocity " << velocityState_.velocity << " " << conn_; velocityState_.direction = newDirection; velocityState_.lastCwndRecordTime = ackTime; velocityState_.lastRecordedCwndBytes = cwndBytes_; } } void Copa::changeDirection( VelocityState::Direction newDirection, const TimePoint ackTime) { if (velocityState_.direction == newDirection) { return; } VLOG(10) << __func__ << " Suddenly direction change to " << newDirection << " " << conn_; velocityState_.direction = newDirection; velocityState_.velocity = 1; velocityState_.numTimesDirectionSame = 0; velocityState_.lastCwndRecordTime = ackTime; velocityState_.lastRecordedCwndBytes = cwndBytes_; } void Copa::onPacketAckOrLoss( const AckEvent* FOLLY_NULLABLE ack, const LossEvent* FOLLY_NULLABLE loss) { if (loss) { onPacketLoss(*loss); if (conn_.pacer) { conn_.pacer->onPacketsLoss(); } } if (ack && ack->largestNewlyAckedPacket.has_value()) { onPacketAcked(*ack); } } void Copa::onPacketAcked(const AckEvent& ack) { DCHECK(ack.largestNewlyAckedPacket.has_value()); subtractAndCheckUnderflow(conn_.lossState.inflightBytes, ack.ackedBytes); minRTTFilter_.Update( conn_.lossState.lrtt, std::chrono::duration_cast<microseconds>(ack.ackTime.time_since_epoch()) .count()); auto rttMin = minRTTFilter_.GetBest(); if (useRttStanding_) { standingRTTFilter_.SetWindowLength(conn_.lossState.srtt.count()); } else { standingRTTFilter_.SetWindowLength(conn_.lossState.srtt.count() / 2); } standingRTTFilter_.Update( conn_.lossState.lrtt, std::chrono::duration_cast<microseconds>(ack.ackTime.time_since_epoch()) .count()); auto rttStandingMicroSec = standingRTTFilter_.GetBest().count(); VLOG(10) << __func__ << "ack size=" << ack.ackedBytes << " num packets acked=" << ack.ackedBytes / conn_.udpSendPacketLen << " writable=" << getWritableBytes() << " cwnd=" << cwndBytes_ << " inflight=" << conn_.lossState.inflightBytes << " rttMin=" << rttMin.count() << " sRTT=" << conn_.lossState.srtt.count() << " lRTT=" << conn_.lossState.lrtt.count() << " mRTT=" << conn_.lossState.mrtt.count() << " rttvar=" << conn_.lossState.rttvar.count() << " packetsBufferred=" << conn_.flowControlState.sumCurStreamBufferLen << " packetsRetransmitted=" << conn_.lossState.rtxCount << " " << conn_; if (conn_.qLogger) { conn_.qLogger->addCongestionMetricUpdate( conn_.lossState.inflightBytes, getCongestionWindow(), kCongestionPacketAck); } if (rttStandingMicroSec < rttMin.count()) { VLOG(3) << __func__ << "delay negative, rttStanding=" << rttStandingMicroSec << " rttMin=" << rttMin.count() << " " << conn_; return; } uint64_t delayInMicroSec; if (useRttStanding_) { delayInMicroSec = rttStandingMicroSec - rttMin.count(); } else { delayInMicroSec = duration_cast<microseconds>(conn_.lossState.lrtt - rttMin).count(); } if (rttStandingMicroSec == 0) { VLOG(3) << __func__ << "rttStandingMicroSec zero, lrtt = " << conn_.lossState.lrtt.count() << " rttMin=" << rttMin.count() << " " << conn_; return; } VLOG(10) << __func__ << " estimated queuing delay microsec =" << delayInMicroSec << " " << conn_; bool increaseCwnd = false; if (delayInMicroSec == 0) { // taking care of inf targetRate case here, this happens in beginning where // we do want to increase cwnd increaseCwnd = true; } else { auto targetRate = (1.0 * conn_.udpSendPacketLen * 1000000) / (deltaParam_ * delayInMicroSec); auto currentRate = (1.0 * cwndBytes_ * 1000000) / rttStandingMicroSec; VLOG(10) << __func__ << " estimated target rate=" << targetRate << " current rate=" << currentRate << " " << conn_; increaseCwnd = targetRate >= currentRate; } if (!(increaseCwnd && isSlowStart_)) { // Update direction except for the case where we are in slow start mode, checkAndUpdateDirection(ack.ackTime); } if (increaseCwnd) { if (isSlowStart_) { // When a flow starts, Copa performs slow-start where // cwnd doubles once per RTT until current rate exceeds target rate". if (!lastCwndDoubleTime_.has_value()) { lastCwndDoubleTime_ = ack.ackTime; } else if ( ack.ackTime - lastCwndDoubleTime_.value() > conn_.lossState.srtt) { VLOG(10) << __func__ << " doubling cwnd per RTT from=" << cwndBytes_ << " due to slow start" << " " << conn_; addAndCheckOverflow(cwndBytes_, cwndBytes_); lastCwndDoubleTime_ = ack.ackTime; } } else { if (velocityState_.direction != VelocityState::Direction::Up && velocityState_.velocity > 1.0) { // if our current rate is much different than target, we double v every // RTT. That could result in a high v at some point in time. If we // detect a sudden direction change here, while v is still very high but // meant for opposite direction, we should reset it to 1. changeDirection(VelocityState::Direction::Up, ack.ackTime); } uint64_t addition = (ack.ackedPackets.size() * conn_.udpSendPacketLen * conn_.udpSendPacketLen * velocityState_.velocity) / (deltaParam_ * cwndBytes_); VLOG(10) << __func__ << " increasing cwnd from=" << cwndBytes_ << " by " << addition << " " << conn_; addAndCheckOverflow(cwndBytes_, addition); } } else { if (velocityState_.direction != VelocityState::Direction::Down && velocityState_.velocity > 1.0) { // if our current rate is much different than target, we double v every // RTT. That could result in a high v at some point in time. If we detect // a sudden direction change here, while v is still very high but meant // for opposite direction, we should reset it to 1. changeDirection(VelocityState::Direction::Down, ack.ackTime); } uint64_t reduction = (ack.ackedPackets.size() * conn_.udpSendPacketLen * conn_.udpSendPacketLen * velocityState_.velocity) / (deltaParam_ * cwndBytes_); VLOG(10) << __func__ << " decreasing cwnd from=" << cwndBytes_ << " by " << reduction << " " << conn_; isSlowStart_ = false; subtractAndCheckUnderflow( cwndBytes_, std::min<uint64_t>( reduction, cwndBytes_ - conn_.transportSettings.minCwndInMss * conn_.udpSendPacketLen)); } if (conn_.pacer) { conn_.pacer->refreshPacingRate(cwndBytes_ * 2, conn_.lossState.srtt); } } void Copa::onPacketLoss(const LossEvent& loss) { VLOG(10) << __func__ << " lostBytes=" << loss.lostBytes << " lostPackets=" << loss.lostPackets << " cwnd=" << cwndBytes_ << " inflight=" << conn_.lossState.inflightBytes << " " << conn_; if (conn_.qLogger) { conn_.qLogger->addCongestionMetricUpdate( conn_.lossState.inflightBytes, getCongestionWindow(), kCongestionPacketLoss); } DCHECK(loss.largestLostPacketNum.has_value()); subtractAndCheckUnderflow(conn_.lossState.inflightBytes, loss.lostBytes); if (loss.persistentCongestion) { // TODO See if we should go to slowStart here VLOG(10) << __func__ << " writable=" << getWritableBytes() << " cwnd=" << cwndBytes_ << " inflight=" << conn_.lossState.inflightBytes << " " << conn_; if (conn_.qLogger) { conn_.qLogger->addCongestionMetricUpdate( conn_.lossState.inflightBytes, getCongestionWindow(), kPersistentCongestion); } cwndBytes_ = conn_.transportSettings.minCwndInMss * conn_.udpSendPacketLen; if (conn_.pacer) { conn_.pacer->refreshPacingRate(cwndBytes_ * 2, conn_.lossState.srtt); } } } uint64_t Copa::getWritableBytes() const noexcept { if (conn_.lossState.inflightBytes > cwndBytes_) { return 0; } else { return cwndBytes_ - conn_.lossState.inflightBytes; } } uint64_t Copa::getCongestionWindow() const noexcept { return cwndBytes_; } bool Copa::inSlowStart() { return isSlowStart_; } CongestionControlType Copa::type() const noexcept { return CongestionControlType::Copa; } uint64_t Copa::getBytesInFlight() const noexcept { return conn_.lossState.inflightBytes; } void Copa::setAppIdle(bool, TimePoint) noexcept { /* unsupported */ } void Copa::setAppLimited() { /* unsupported */ } bool Copa::isAppLimited() const noexcept { return false; // not supported } void Copa::getStats(CongestionControllerStats& stats) const { stats.copaStats.deltaParam = deltaParam_; stats.copaStats.useRttStanding = useRttStanding_; } } // namespace quic