quic/state/StateData.h (368 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #pragma once #include <quic/QuicConstants.h> #include <quic/api/Observer.h> #include <quic/codec/ConnectionIdAlgo.h> #include <quic/codec/QuicReadCodec.h> #include <quic/codec/QuicWriteCodec.h> #include <quic/codec/Types.h> #include <quic/common/BufAccessor.h> #include <quic/common/WindowedCounter.h> #include <quic/congestion_control/CongestionController.h> #include <quic/d6d/ProbeSizeRaiser.h> #include <quic/handshake/HandshakeLayer.h> #include <quic/logging/QLogger.h> #include <quic/state/AckEvent.h> #include <quic/state/AckStates.h> #include <quic/state/LossState.h> #include <quic/state/OutstandingPacket.h> #include <quic/state/PacketEvent.h> #include <quic/state/PendingPathRateLimiter.h> #include <quic/state/QuicConnectionStats.h> #include <quic/state/QuicStreamManager.h> #include <quic/state/QuicTransportStatsCallback.h> #include <quic/state/StreamData.h> #include <quic/state/TransportSettings.h> #include <folly/Optional.h> #include <folly/io/IOBuf.h> #include <folly/io/async/AsyncUDPSocket.h> #include <folly/io/async/DelayedDestruction.h> #include <folly/io/async/HHWheelTimer.h> #include <chrono> #include <list> #include <numeric> #include <queue> namespace quic { struct RecvmmsgStorage { // Storage for the recvmmsg system call. std::vector<struct mmsghdr> msgs; std::vector<struct sockaddr_storage> addrs; std::vector<struct iovec> iovecs; // Buffers we pass to recvmmsg. std::vector<Buf> readBuffers; // Free buffers which were not used in previous iterations. std::vector<Buf> freeBufs; void resize(size_t numPackets) { if (msgs.size() != numPackets) { msgs.resize(numPackets); addrs.resize(numPackets); readBuffers.resize(numPackets); iovecs.resize(numPackets); freeBufs.reserve(numPackets); } } }; struct NetworkData { TimePoint receiveTimePoint; std::vector<Buf> packets; size_t totalData{0}; NetworkData() = default; NetworkData(Buf&& buf, const TimePoint& receiveTime) : receiveTimePoint(receiveTime) { if (buf) { totalData = buf->computeChainDataLength(); packets.emplace_back(std::move(buf)); } } NetworkData(std::vector<Buf>&& packetBufs, const TimePoint& receiveTime) : receiveTimePoint(receiveTime), packets(std::move(packetBufs)), totalData(0) { for (const auto& buf : packets) { totalData += buf->computeChainDataLength(); } } std::unique_ptr<folly::IOBuf> moveAllData() && { std::unique_ptr<folly::IOBuf> buf; for (size_t i = 0; i < packets.size(); ++i) { if (buf) { buf->prependChain(std::move(packets[i])); } else { buf = std::move(packets[i]); } } return buf; } }; struct NetworkDataSingle { Buf data; TimePoint receiveTimePoint; size_t totalData{0}; NetworkDataSingle() = default; NetworkDataSingle( std::unique_ptr<folly::IOBuf> buf, const TimePoint& receiveTime) : data(std::move(buf)), receiveTimePoint(receiveTime) { if (data) { totalData += data->computeChainDataLength(); } } }; struct OutstandingsInfo { // Sent packets which have not been acked. These are sorted by PacketNum. std::deque<OutstandingPacket> packets; // All PacketEvents of this connection. If a OutstandingPacket doesn't have an // associatedEvent or if it's not in this set, there is no need to process its // frames upon ack or loss. folly::F14FastSet<PacketEvent, PacketEventHash> packetEvents; // Number of outstanding packets not including cloned EnumArray<PacketNumberSpace, uint64_t> packetCount{}; // Number of packets are clones or cloned. EnumArray<PacketNumberSpace, uint64_t> clonedPacketCount{}; // Number of packets currently declared lost. uint64_t declaredLostCount{0}; // Number of outstanding inflight DSR packet. That is, when a DSR packet is // declared lost, this counter will be decreased. uint64_t dsrCount{0}; // Number of packets outstanding and not declared lost. uint64_t numOutstanding() { return packets.size() - declaredLostCount; } // Total number of cloned packets. uint64_t numClonedPackets() { return clonedPacketCount[PacketNumberSpace::Initial] + clonedPacketCount[PacketNumberSpace::Handshake] + clonedPacketCount[PacketNumberSpace::AppData]; } }; struct Pacer { virtual ~Pacer() = default; /** * API for CongestionController to notify Pacer the latest cwnd value in bytes * and connection RTT so that Pacer can recalculate pacing rates. * Note the third parameter is here for testing purposes. */ virtual void refreshPacingRate( uint64_t cwndBytes, std::chrono::microseconds rtt, TimePoint currentTime = Clock::now()) = 0; /** * Set the pacers rate to the given value in Bytes per second */ virtual void setPacingRate(uint64_t rateBps) = 0; /** * Set an upper limit on the rate this pacer can use. * - If the pacer is currently using a faster pace, it will be brought down to * maxRateBytesPerSec. * - If refreshPacingRate or setPacingRate are called with a value * greated than maxRateBytesPerSec, maxRateBytesPerSec will be used instead. */ virtual void setMaxPacingRate(uint64_t maxRateBytesPerSec) = 0; /** * Resets the pacer, which should have the effect of the next write * happening immediately. */ virtual void reset() = 0; /** * Set the factor by which to multiply the RTT before determining the * inter-burst interval. E.g. a numerator of 1 and a denominator of 2 * would effectively double the pacing rate. */ virtual void setRttFactor(uint8_t numerator, uint8_t denominator) = 0; /** * API for Trnasport to query the interval before next write */ [[nodiscard]] virtual std::chrono::microseconds getTimeUntilNextWrite( TimePoint currentTime = Clock::now()) const = 0; /** * API for Transport to query a recalculated batch size based on currentTime * and previously scheduled write time. The batch size is how many packets the * transport can write out per eventbase loop. * * currentTime: The caller is expected to pass in a TimePoint value so that * the Pacer can compensate the timer drift. */ virtual uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) = 0; /** * Getter API of the most recent write batch size. */ virtual uint64_t getCachedWriteBatchSize() const = 0; virtual void onPacketSent() = 0; virtual void onPacketsLoss() = 0; virtual void setExperimental(bool experimental) = 0; }; struct PacingRate { std::chrono::microseconds interval{0us}; uint64_t burstSize{0}; struct Builder { Builder&& setInterval(std::chrono::microseconds interval) &&; Builder&& setBurstSize(uint64_t burstSize) &&; PacingRate build() &&; private: std::chrono::microseconds interval_{0us}; uint64_t burstSize_{0}; }; private: PacingRate(std::chrono::microseconds interval, uint64_t burstSize); }; struct QuicCryptoStream : public QuicStreamLike { ~QuicCryptoStream() override = default; }; struct QuicCryptoState { // Stream to exchange the initial cryptographic material. QuicCryptoStream initialStream; // Stream to exchange the one rtt key material. QuicCryptoStream handshakeStream; // Stream to exchange handshake data encrypted with 1-rtt keys. QuicCryptoStream oneRttStream; }; struct ConnectionCloseEvent { TransportErrorCode errorCode; std::string reasonPhrase; PacketNum packetSequenceNum; }; struct RstStreamEvent { RstStreamEvent(StreamId id, uint64_t offset, ApplicationErrorCode error) : stream(id), byteOffset(offset), errorCode(error) {} StreamId stream; uint64_t byteOffset; ApplicationErrorCode errorCode; }; using Resets = folly::F14FastMap<StreamId, RstStreamFrame>; using FrameList = std::vector<QuicSimpleFrame>; class Logger; class CongestionControllerFactory; class LoopDetectorCallback; class PendingPathRateLimiter; struct ReadDatagram { ReadDatagram(TimePoint recvTimePoint, BufQueue data) : receiveTimePoint_{recvTimePoint}, buf_{std::move(data)} {} [[nodiscard]] TimePoint receiveTimePoint() const noexcept { return receiveTimePoint_; } [[nodiscard]] BufQueue& bufQueue() noexcept { return buf_; } [[nodiscard]] const BufQueue& bufQueue() const noexcept { return buf_; } // Move only to match BufQueue behavior ReadDatagram(ReadDatagram&& other) noexcept = default; ReadDatagram& operator=(ReadDatagram&& other) = default; ReadDatagram(const ReadDatagram&) = delete; ReadDatagram& operator=(const ReadDatagram&) = delete; private: TimePoint receiveTimePoint_; BufQueue buf_; }; struct QuicConnectionStateBase : public folly::DelayedDestruction { virtual ~QuicConnectionStateBase() override = default; explicit QuicConnectionStateBase(QuicNodeType type) : nodeType(type) { observers = std::make_shared<ObserverVec>(); } // Accessor to output buffer for continuous memory GSO writes BufAccessor* bufAccessor{nullptr}; std::unique_ptr<Handshake> handshakeLayer; // Crypto stream std::unique_ptr<QuicCryptoState> cryptoState; // Connection Congestion controller std::unique_ptr<CongestionController> congestionController; // Pacer std::unique_ptr<Pacer> pacer; // Congestion Controller factory to create specific impl of cc algorithm std::shared_ptr<CongestionControllerFactory> congestionControllerFactory; std::unique_ptr<QuicStreamManager> streamManager; // When server receives early data attempt without valid source address token, // server will limit bytes in flight to avoid amplification attack. // This limit should be cleared and set back to max after CFIN is received. folly::Optional<uint32_t> writableBytesLimit; std::unique_ptr<PendingPathRateLimiter> pathValidationLimiter; // Outstanding packets, packet events, and associated counters wrapped in one // class OutstandingsInfo outstandings; // The read codec to decrypt and decode packets. std::unique_ptr<QuicReadCodec> readCodec; // Initial header cipher. std::unique_ptr<PacketNumberCipher> initialHeaderCipher; // Handshake header cipher. std::unique_ptr<PacketNumberCipher> handshakeWriteHeaderCipher; // One rtt write header cipher. std::unique_ptr<PacketNumberCipher> oneRttWriteHeaderCipher; // Write cipher for 1-RTT data std::unique_ptr<Aead> oneRttWriteCipher; // Write cipher for packets with initial keys. std::unique_ptr<Aead> initialWriteCipher; // Write cipher for packets with handshake keys. std::unique_ptr<Aead> handshakeWriteCipher; // Time at which the connection started. TimePoint connectionTime; // The received active_connection_id_limit transport parameter from the peer. uint64_t peerActiveConnectionIdLimit{0}; // The destination connection id used in client's initial packet. folly::Optional<ConnectionId> clientChosenDestConnectionId; // The source connection id used in client's initial packet. folly::Optional<ConnectionId> clientConnectionId; // The current server chosen connection id. folly::Optional<ConnectionId> serverConnectionId; // Connection ids issued by self. std::vector<ConnectionIdData> selfConnectionIds; // Connection ids issued by peer - to be used as destination ids. std::vector<ConnectionIdData> peerConnectionIds; // ConnectionIdAlgo implementation to encode and decode ConnectionId with // various info, such as routing related info. ConnectionIdAlgo* connIdAlgo{nullptr}; // Negotiated version. folly::Optional<QuicVersion> version; // Original advertised version. Only meaningful to clients. // TODO: move to client only conn state. folly::Optional<QuicVersion> originalVersion; // Original address used by the peer when first establishing the connection. folly::SocketAddress originalPeerAddress; // Current peer address. folly::SocketAddress peerAddress; // Local address. INADDR_ANY if not set. folly::Optional<folly::SocketAddress> localAddress; // Local error on the connection. folly::Optional<QuicError> localConnectionError; // Error sent on the connection by the peer. folly::Optional<QuicError> peerConnectionError; // Supported versions in order of preference. Only meaningful to clients. // TODO: move to client only conn state. std::vector<QuicVersion> supportedVersions; // The endpoint attempts to create a new self connection id with sequence // number and stateless reset token for itself, and if successful, returns it // and updates the connection's state to ensure its peer can use it. virtual folly::Optional<ConnectionIdData> createAndAddNewSelfConnId() { return folly::none; } uint64_t nextSelfConnectionIdSequence{0}; // D6D related events struct PendingD6DEvents { // If we should schedule/cancel d6d raise timeout, if it's not // already scheduled/canceled bool scheduleRaiseTimeout{false}; // If we should schedule/cancel d6d probe timeout, if it's not // already scheduled/canceled bool scheduleProbeTimeout{false}; // To send a d6d probe packet bool sendProbePacket{false}; // The delay after which sendD6DProbePacket will be set folly::Optional<std::chrono::milliseconds> sendProbeDelay; }; struct PendingEvents { Resets resets; folly::Optional<PathChallengeFrame> pathChallenge; FrameList frames; // D6D related events PendingD6DEvents d6d; std::vector<KnobFrame> knobs; // Number of probing packets to send after PTO EnumArray<PacketNumberSpace, uint8_t> numProbePackets{}; FOLLY_NODISCARD bool anyProbePackets() const { return numProbePackets[PacketNumberSpace::Initial] + numProbePackets[PacketNumberSpace::Handshake] + numProbePackets[PacketNumberSpace::AppData]; } // true: schedule timeout if not scheduled // false: cancel scheduled timeout bool schedulePathValidationTimeout{false}; // If we should schedule a new Ack timeout, if it's not already scheduled bool scheduleAckTimeout{false}; // Whether a connection level window update is due to send bool connWindowUpdate{false}; // If there is a pending loss detection alarm update bool setLossDetectionAlarm{false}; bool cancelPingTimeout{false}; bool notifyPingReceived{false}; // close transport when the next packet number reaches kMaxPacketNum bool closeTransport{false}; // To send a ping frame bool sendPing{false}; // Do we need to send data blocked frame when connection is blocked. bool sendDataBlocked{false}; }; PendingEvents pendingEvents; LossState lossState; // This contains the ack and packet number related states for all three // packet number space. AckStates ackStates; struct ConnectionFlowControlState { // The size of the connection flow control window. uint64_t windowSize{0}; // The max data we have advertised to the peer. uint64_t advertisedMaxOffset{0}; // The max data the peer has advertised on the connection. // This is set to 0 initially so that we can't send any data until we know // the peer's flow control offset. uint64_t peerAdvertisedMaxOffset{0}; // The sum of the min(read offsets) of all the streams on the conn. uint64_t sumCurReadOffset{0}; // The sum of the max(offset) observed on all the streams on the conn. uint64_t sumMaxObservedOffset{0}; // The sum of write offsets of all the streams, only including the offsets // written on the wire. uint64_t sumCurWriteOffset{0}; // The sum of length of data in all the stream buffers. uint64_t sumCurStreamBufferLen{0}; // The packet number in which we got the last largest max data. folly::Optional<PacketNum> largestMaxOffsetReceived; // The following are advertised by the peer, and are set to zero initially // so that we cannot send any data until we know the peer values. // The initial max stream offset for peer-initiated bidirectional streams. uint64_t peerAdvertisedInitialMaxStreamOffsetBidiLocal{0}; // The initial max stream offset for local-initiated bidirectional streams. uint64_t peerAdvertisedInitialMaxStreamOffsetBidiRemote{0}; // The initial max stream offset for unidirectional streams. uint64_t peerAdvertisedInitialMaxStreamOffsetUni{0}; // Time at which the last flow control update was sent by the transport. folly::Optional<TimePoint> timeOfLastFlowControlUpdate; }; // Current state of flow control. ConnectionFlowControlState flowControlState; // The outstanding path challenge folly::Optional<PathChallengeFrame> outstandingPathValidation; // Settings for transports. TransportSettings transportSettings; // Value of the negotiated ack delay exponent. uint64_t peerAckDelayExponent{kDefaultAckDelayExponent}; // The value of the peer's min_ack_delay, for creating ACK_FREQUENCY frames. folly::Optional<std::chrono::microseconds> peerMinAckDelay; // Idle timeout advertised by the peer. Initially sets it to the maximum value // until the handshake sets the timeout. std::chrono::milliseconds peerIdleTimeout{kMaxIdleTimeout}; // The max UDP packet size we will be sending, limited by both the received // max_packet_size in Transport Parameters and PMTU uint64_t udpSendPacketLen{kDefaultUDPSendPacketLen}; // Peer-advertised max UDP payload size, stored as an opportunistic value to // use when receiving the forciblySetUdpPayloadSize transport knob param uint64_t peerMaxUdpPayloadSize{kDefaultUDPSendPacketLen}; struct PacketSchedulingState { StreamId nextScheduledControlStream{0}; }; PacketSchedulingState schedulingState; // Logger for this connection. std::shared_ptr<Logger> logger; // QLogger for this connection std::shared_ptr<QLogger> qLogger; // Track stats for various server events QuicTransportStatsCallback* statsCallback{nullptr}; // Meta state of d6d, mostly useful for analytics. D6D can operate without it. struct D6DMetaState { // Cumulative count of acked packets uint64_t totalAckedProbes{0}; // Cumulative count of lost packets uint64_t totalLostProbes{0}; // Cumulative count of transmitted packets uint64_t totalTxedProbes{0}; // Timepoint of when d6d reaches a non-search state // this helps us understand the convergence speed TimePoint timeLastNonSearchState; // Last non-search state D6DMachineState lastNonSearchState; }; struct D6DState { // The lastest d6d probe packet transmitted folly::Optional<D6DProbePacket> lastProbe; // The raise timeout std::chrono::seconds raiseTimeout{kDefaultD6DRaiseTimeout}; // The probe timeout std::chrono::seconds probeTimeout{kDefaultD6DProbeTimeout}; // The number of outstanding probe packets uint64_t outstandingProbes{0}; // The base PMTU to start probing with uint16_t basePMTU{kDefaultD6DBasePMTU}; // The max PMTU, determined by max_packet_size transport parameter uint16_t maxPMTU{kDefaultMaxUDPPayload}; // Current probe size, dynamically adjusted by the probing algorithm uint32_t currentProbeSize{kDefaultD6DBasePMTU}; // Probe size raiser std::unique_ptr<ProbeSizeRaiser> raiser; // ThresholdCounter to help detect PMTU blackhole std::unique_ptr<WindowedCounter<uint64_t, uint64_t>> thresholdCounter{ nullptr}; // Meta state D6DMetaState meta; // Turn off blackhole detection bool noBlackholeDetection{false}; // D6D Machine State D6DMachineState state{D6DMachineState::DISABLED}; }; D6DState d6d; // Debug information. Currently only used to debug busy loop of Transport // WriteLooper. struct WriteDebugState { bool needsWriteLoopDetect{false}; uint64_t currentEmptyLoopCount{0}; WriteDataReason writeDataReason{WriteDataReason::NO_WRITE}; NoWriteReason noWriteReason{NoWriteReason::WRITE_OK}; std::string schedulerName; }; struct ReadDebugState { uint64_t loopCount{0}; NoReadReason noReadReason{NoReadReason::READ_OK}; }; WriteDebugState writeDebugState; ReadDebugState readDebugState; std::shared_ptr<LoopDetectorCallback> loopDetectorCallback; // Measure rtt betwen pathchallenge & path response frame // Use this measured rtt as init rtt (from Transport Settings) TimePoint pathChallengeStartTime; /** * Eary data app params functions. */ folly::Function<bool(const folly::Optional<std::string>&, const Buf&) const> earlyDataAppParamsValidator; folly::Function<Buf()> earlyDataAppParamsGetter; /** * Selects a previously unused peer-issued connection id to use. * If there are no available ids return false and don't change anything. * Return true if replacement succeeds. */ bool retireAndSwitchPeerConnectionIds(); // queue of functions to be called in processCallbacksAfterNetworkData std::vector<std::function<void(QuicSocket*)>> pendingCallbacks; // Vector of Observers that are attached to this socket. std::shared_ptr<const ObserverVec> observers; // Recent ACK events, for use in processCallbacksAfterNetworkData. // Holds the ACK events generated during the last round of ACK processing. std::vector<AckEvent> lastProcessedAckEvents; // Type of node owning this connection (client or server). QuicNodeType nodeType; // Whether or not we received a new packet before a write. bool receivedNewPacketBeforeWrite{false}; // Whether we've set the transporot parameters from transportSettings yet. bool transportParametersEncoded{false}; // Whether a connection can be paced based on its handshake and close states. // For example, we may not want to pace a connection that's still handshaking. bool canBePaced{false}; // Flag indicating whether the socket is currently waiting for the app to // write data. All new sockets start off in this state where they wait for the // application to pump data to the socket. bool waitingForAppData{true}; // Monotonically increasing counter that is incremented each time there is a // write on this socket (writeSocketData() is called), This is used to // identify specific outstanding packets (based on writeCount and packetNum) // in the Observers, to construct Write Blocks uint64_t writeCount{0}; // Number of DSR packets sent by this connection. uint64_t dsrPacketCount{0}; // Whether we successfully used 0-RTT keys in this connection. bool usedZeroRtt{false}; struct DatagramState { uint16_t maxReadFrameSize{kDefaultMaxDatagramFrameSize}; uint16_t maxWriteFrameSize{kDefaultMaxDatagramFrameSize}; uint32_t maxReadBufferSize{kDefaultMaxDatagramsBuffered}; uint32_t maxWriteBufferSize{kDefaultMaxDatagramsBuffered}; // Buffers Incoming Datagrams std::deque<ReadDatagram> readBuffer; // Buffers Outgoing Datagrams std::deque<BufQueue> writeBuffer; }; DatagramState datagramState; }; std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st); struct AckStateVersion { uint64_t initialAckStateVersion{kDefaultIntervalSetVersion}; uint64_t handshakeAckStateVersion{kDefaultIntervalSetVersion}; uint64_t appDataAckStateVersion{kDefaultIntervalSetVersion}; AckStateVersion( uint64_t initialVersion, uint64_t handshakeVersion, uint64_t appDataVersion); bool operator==(const AckStateVersion& other) const; bool operator!=(const AckStateVersion& other) const; }; } // namespace quic