quic/state/StreamData.h (246 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/container/F14Map.h>
#include <quic/QuicConstants.h>
#include <quic/codec/Types.h>
#include <quic/common/SmallVec.h>
#include <quic/dsr/DSRPacketizationRequestSender.h>
#include <quic/state/QuicPriorityQueue.h>
namespace quic {
/**
* A buffer representation without the actual data. This is part of the public
* facing interface.
*
* This is experimental.
*/
struct BufferMeta {
size_t length;
explicit BufferMeta(size_t lengthIn) : length(lengthIn) {}
};
/**
* A write buffer representation without the actual data. This is used for
* write buffer management in a stream.
*
* This is experimental.
*/
struct WriteBufferMeta {
size_t length{0};
size_t offset{0};
bool eof{false};
WriteBufferMeta() = default;
struct Builder {
Builder& setLength(size_t lengthIn) {
length_ = lengthIn;
return *this;
}
Builder& setOffset(size_t offsetIn) {
offset_ = offsetIn;
return *this;
}
Builder& setEOF(bool val) {
eof_ = val;
return *this;
}
WriteBufferMeta build() {
return WriteBufferMeta(length_, offset_, eof_);
}
private:
size_t length_{0};
size_t offset_{0};
bool eof_{false};
};
WriteBufferMeta split(size_t splitLen) {
CHECK_GE(length, splitLen);
auto splitEof = splitLen == length && eof;
WriteBufferMeta splitOf(splitLen, offset, splitEof);
offset += splitLen;
length -= splitLen;
return splitOf;
}
private:
explicit WriteBufferMeta(size_t lengthIn, size_t offsetIn, bool eofIn)
: length(lengthIn), offset(offsetIn), eof(eofIn) {}
};
struct StreamBuffer {
BufQueue data;
uint64_t offset;
bool eof{false};
StreamBuffer(Buf dataIn, uint64_t offsetIn, bool eofIn = false) noexcept
: data(std::move(dataIn)), offset(offsetIn), eof(eofIn) {}
StreamBuffer(StreamBuffer&& other) = default;
StreamBuffer& operator=(StreamBuffer&& other) = default;
};
struct QuicStreamLike {
QuicStreamLike() = default;
QuicStreamLike(QuicStreamLike&&) = default;
virtual ~QuicStreamLike() = default;
// List of bytes that have been read and buffered. We need to buffer
// bytes in case we get bytes out of order.
std::deque<StreamBuffer> readBuffer;
// List of bytes that have been written to the QUIC layer.
BufQueue writeBuffer{};
// Stores a map of offset:buffers which have been written to the socket and
// are currently un-acked. Each one represents one StreamFrame that was
// written. We need to buffer these because these might be retransmitted in
// the future. These are associated with the starting offset of the buffer.
// Note: the offset in the StreamBuffer itself can be >= the offset on which
// it is keyed due to partial reliability - when data is skipped the offset
// in the StreamBuffer may be incremented, but the keyed offset must remain
// the same so it can be removed from the buffer on ACK.
folly::F14FastMap<uint64_t, std::unique_ptr<StreamBuffer>>
retransmissionBuffer;
// Tracks intervals which we have received ACKs for. E.g. in the case of all
// data being acked this would contain one internval from 0 -> the largest
// offseet ACKed. This allows us to track which delivery callbacks can be
// called.
template <class T>
using IntervalSetVec = SmallVec<T, 32, uint16_t>;
using AckedIntervals = IntervalSet<uint64_t, 1, IntervalSetVec>;
AckedIntervals ackedIntervals;
// Stores a list of buffers which have been marked as loss by loss detector.
// Each one represents one StreamFrame that was written.
std::deque<StreamBuffer> lossBuffer;
// Current offset of the start bytes in the write buffer.
// This changes when we pop stuff off the writeBuffer.
// In a non-DSR stream, when we are finished writing out all the bytes until
// FIN, this will be one greater than finalWriteOffset.
// When DSR is used, this still points to the starting bytes in the write
// buffer. Its value won't change with WriteBufferMetas are appended and sent
// for a stream.
uint64_t currentWriteOffset{0};
// the minimum offset requires retransmit
// N.B. used in QUIC partial reliability
uint64_t minimumRetransmittableOffset{0};
// Offset of the next expected bytes that we need to read from
// the read buffer.
uint64_t currentReadOffset{0};
// the smallest data offset that we expect the peer to send.
// N.B. used in QUIC partial reliability
uint64_t currentReceiveOffset{0};
// Maximum byte offset observed on the stream.
uint64_t maxOffsetObserved{0};
// If an EOF is observed on the stream, the position of the EOF. It could be
// either from FIN or RST. Right now we use one value to represent both FIN
// and RST. We may split write EOF into two values in the future.
// Read side eof offset.
folly::Optional<uint64_t> finalReadOffset;
// Current cumulative number of packets sent for this stream. It only counts
// egress packets that contains a *new* STREAM frame for this stream.
uint64_t numPacketsTxWithNewData{0};
/*
* Either insert a new entry into the loss buffer, or merge the buffer with
* an existing entry.
*/
void insertIntoLossBuffer(std::unique_ptr<StreamBuffer> buf) {
// We assume here that we won't try to insert an overlapping buffer, as
// that should never happen in the loss buffer.
auto lossItr = std::upper_bound(
lossBuffer.begin(),
lossBuffer.end(),
buf->offset,
[](auto offset, const auto& buffer) { return offset < buffer.offset; });
if (!lossBuffer.empty() && lossItr != lossBuffer.begin() &&
std::prev(lossItr)->offset + std::prev(lossItr)->data.chainLength() ==
buf->offset) {
std::prev(lossItr)->data.append(buf->data.move());
std::prev(lossItr)->eof = buf->eof;
} else {
lossBuffer.insert(lossItr, std::move(*buf));
}
}
};
struct QuicConnectionStateBase;
enum class StreamSendState : uint8_t { Open, ResetSent, Closed, Invalid };
enum class StreamRecvState : uint8_t { Open, Closed, Invalid };
inline folly::StringPiece streamStateToString(StreamSendState state) {
switch (state) {
case StreamSendState::Open:
return "Open";
case StreamSendState::ResetSent:
return "ResetSent";
case StreamSendState::Closed:
return "Closed";
case StreamSendState::Invalid:
return "Invalid";
}
return "Unknown";
}
inline folly::StringPiece streamStateToString(StreamRecvState state) {
switch (state) {
case StreamRecvState::Open:
return "Open";
case StreamRecvState::Closed:
return "Closed";
case StreamRecvState::Invalid:
return "Invalid";
}
return "Unknown";
}
struct QuicStreamState : public QuicStreamLike {
virtual ~QuicStreamState() override = default;
QuicStreamState(StreamId id, QuicConnectionStateBase& conn);
QuicStreamState(QuicStreamState&&) = default;
/**
* Constructor to migrate QuicStreamState to another
* QuicConnectionStateBase.
*/
QuicStreamState(QuicConnectionStateBase& connIn, QuicStreamState&& other)
: QuicStreamLike(std::move(other)), conn(connIn), id(other.id) {
// QuicStreamState fields
finalWriteOffset = other.finalWriteOffset;
flowControlState = other.flowControlState;
streamReadError = other.streamReadError;
streamWriteError = other.streamWriteError;
sendState = other.sendState;
recvState = other.recvState;
isControl = other.isControl;
lastHolbTime = other.lastHolbTime;
totalHolbTime = other.totalHolbTime;
holbCount = other.holbCount;
priority = other.priority;
dsrSender = std::move(other.dsrSender);
writeBufMeta = other.writeBufMeta;
retransmissionBufMetas = std::move(other.retransmissionBufMetas);
lossBufMetas = std::move(other.lossBufMetas);
}
// Connection that this stream is associated with.
QuicConnectionStateBase& conn;
// Stream id of the connection.
StreamId id;
// Write side eof offset. This represents only the final FIN offset.
folly::Optional<uint64_t> finalWriteOffset;
struct StreamFlowControlState {
uint64_t windowSize{0};
uint64_t advertisedMaxOffset{0};
uint64_t peerAdvertisedMaxOffset{0};
// Time at which the last flow control update was sent by the transport.
folly::Optional<TimePoint> timeOfLastFlowControlUpdate;
};
StreamFlowControlState flowControlState;
// Stream level read error occured.
folly::Optional<QuicErrorCode> streamReadError;
// Stream level write error occured.
folly::Optional<QuicErrorCode> streamWriteError;
// State machine data
StreamSendState sendState{StreamSendState::Open};
// State machine data
StreamRecvState recvState{StreamRecvState::Open};
// Tells whether this stream is a control stream.
// It is set by the app via setControlStream and the transport can use this
// knowledge for optimizations e.g. for setting the app limited state on
// congestion control with control streams still active.
bool isControl{false};
// The last time we detected we were head of line blocked on the stream.
folly::Optional<Clock::time_point> lastHolbTime;
// The total amount of time we are head line blocked on the stream.
std::chrono::microseconds totalHolbTime{0us};
// Number of times the stream has entered the HOLB state
// lastHolbTime indicates whether the stream is HOL blocked at the moment.
uint32_t holbCount{0};
Priority priority{kDefaultPriority};
// Returns true if both send and receive state machines are in a terminal
// state
bool inTerminalStates() const {
bool sendInTerminalState = sendState == StreamSendState::Closed ||
sendState == StreamSendState::Invalid;
bool recvInTerminalState = recvState == StreamRecvState::Closed ||
recvState == StreamRecvState::Invalid;
return sendInTerminalState && recvInTerminalState;
}
// If the stream is still writable.
bool writable() const {
return sendState == StreamSendState::Open && !finalWriteOffset.has_value();
}
bool shouldSendFlowControl() const {
return recvState == StreamRecvState::Open;
}
// If the stream has writable data that's not backed by DSR. That is, in a
// regular stream write, it will be able to write something. So it either
// needs to have writeBuffer, or it has EOF to send.
bool hasWritableData() const {
if (!writeBuffer.empty()) {
return flowControlState.peerAdvertisedMaxOffset - currentWriteOffset > 0;
}
if (finalWriteOffset) {
/**
* This is the case that EOF/FIN is the only thing we can write in a
* non-DSR write for a stream. It's actually OK to send out a FIN with
* correct offset before we send out DSRed bytes. Peer is supposed to be
* able to handle this. But it's also not hard to limit it. So here i'm
* gonna go with the safer path: do not write FIN only stream frame if we
* still have BufMetas to send.
*/
return writeBufMeta.length == 0 &&
currentWriteOffset <= *finalWriteOffset &&
writeBufMeta.offset <= *finalWriteOffset;
}
return false;
}
FOLLY_NODISCARD bool hasWritableBufMeta() const {
if (writeBufMeta.offset == 0) {
return false;
}
if (writeBufMeta.length > 0) {
return flowControlState.peerAdvertisedMaxOffset - writeBufMeta.offset > 0;
}
if (finalWriteOffset) {
return writeBufMeta.offset <= *finalWriteOffset;
}
return false;
}
FOLLY_NODISCARD bool hasSentFIN() const {
if (!finalWriteOffset) {
return false;
}
return currentWriteOffset > *finalWriteOffset ||
writeBufMeta.offset > *finalWriteOffset;
}
FOLLY_NODISCARD uint64_t nextOffsetToWrite() const {
// The stream has never had WriteBufferMetas. Then currentWriteOffset
// always points to the next offset we send. This of course relies on the
// current contract of DSR: Real data always comes first. This code (and a
// lot other code) breaks when that contract is breached.
if (writeBufMeta.offset == 0) {
return currentWriteOffset;
}
if (!writeBuffer.empty()) {
return currentWriteOffset;
}
return writeBufMeta.offset;
}
bool hasReadableData() const {
return (readBuffer.size() > 0 &&
currentReadOffset == readBuffer.front().offset) ||
(finalReadOffset && currentReadOffset == *finalReadOffset);
}
bool hasPeekableData() const {
return readBuffer.size() > 0;
}
std::unique_ptr<DSRPacketizationRequestSender> dsrSender;
// BufferMeta that has been writen to the QUIC layer.
// When offset is 0, nothing has been written to it. On first write, its
// starting offset will be currentWriteOffset + writeBuffer.chainLength().
WriteBufferMeta writeBufMeta;
// A map to store sent WriteBufferMetas for potential retransmission.
folly::F14FastMap<uint64_t, WriteBufferMeta> retransmissionBufMetas;
// WriteBufferMetas that's already marked lost. They will be retransmitted.
std::deque<WriteBufferMeta> lossBufMetas;
/**
* Insert a new WriteBufferMeta into lossBufMetas. If the new WriteBufferMeta
* can be append to an existing WriteBufferMeta, it will be appended. Note
* it won't be prepended to an existing WriteBufferMeta. And it will also not
* merge 3 WriteBufferMetas together if the new one happens to fill up a hole
* between 2 existing WriteBufferMetas.
*/
void insertIntoLossBufMeta(WriteBufferMeta bufMeta) {
auto lossItr = std::upper_bound(
lossBufMetas.begin(),
lossBufMetas.end(),
bufMeta.offset,
[](auto offset, const auto& bufMeta) {
return offset < bufMeta.offset;
});
if (!lossBufMetas.empty() && lossItr != lossBufMetas.begin() &&
std::prev(lossItr)->offset + std::prev(lossItr)->length ==
bufMeta.offset) {
std::prev(lossItr)->length += bufMeta.length;
std::prev(lossItr)->eof = bufMeta.eof;
} else {
lossBufMetas.insert(lossItr, bufMeta);
}
}
};
} // namespace quic