quic/state/QuicStreamFunctions.cpp (379 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/state/QuicStreamFunctions.h>
#include <quic/QuicException.h>
#include <quic/flowcontrol/QuicFlowController.h>
#include <quic/state/QuicStreamUtilities.h>
#include <folly/io/Cursor.h>
#include <algorithm>
namespace {
void prependToBuf(quic::Buf& buf, quic::Buf toAppend) {
if (buf) {
buf->prependChain(std::move(toAppend));
} else {
buf = std::move(toAppend);
}
}
} // namespace
namespace quic {
void writeDataToQuicStream(QuicStreamState& stream, Buf data, bool eof) {
auto neverWrittenBufMeta = (0 == stream.writeBufMeta.offset);
uint64_t len = 0;
if (data) {
len = data->computeChainDataLength();
}
// Once data is written to writeBufMeta, no more data can be written to
// writeBuffer. Write only an EOF is fine.
CHECK(neverWrittenBufMeta || len == 0);
if (len > 0) {
// We call this before updating the writeBuffer because we only want to
// write a blocked frame first time the stream becomes blocked
maybeWriteBlockAfterAPIWrite(stream);
}
stream.writeBuffer.append(std::move(data));
if (eof) {
auto bufferSize =
stream.writeBuffer.front() ? stream.writeBuffer.chainLength() : 0;
stream.finalWriteOffset = stream.currentWriteOffset + bufferSize;
}
updateFlowControlOnWriteToStream(stream, len);
stream.conn.streamManager->updateWritableStreams(stream);
}
void writeBufMetaToQuicStream(
QuicStreamState& stream,
const BufferMeta& data,
bool eof) {
if (data.length > 0) {
maybeWriteBlockAfterAPIWrite(stream);
}
auto realDataLength =
stream.currentWriteOffset + stream.writeBuffer.chainLength();
CHECK_GT(realDataLength, 0)
<< "Real data has to be written to a stream before any buffer meta is"
<< "written to it.";
if (stream.writeBufMeta.offset == 0) {
CHECK(!stream.finalWriteOffset.has_value())
<< "Buffer meta cannot be appended to a stream after we have seen EOM "
<< "in real data";
stream.writeBufMeta.offset = realDataLength;
}
stream.writeBufMeta.length += data.length;
if (eof) {
stream.finalWriteOffset =
stream.writeBufMeta.offset + stream.writeBufMeta.length;
stream.writeBufMeta.eof = true;
}
updateFlowControlOnWriteToStream(stream, data.length);
stream.conn.streamManager->updateWritableStreams(stream);
}
void writeDataToQuicStream(QuicCryptoStream& stream, Buf data) {
stream.writeBuffer.append(std::move(data));
}
void appendDataToReadBufferCommon(
QuicStreamLike& stream,
StreamBuffer buffer,
folly::Function<void(uint64_t, uint64_t)>&& connFlowControlVisitor) {
auto& readBuffer = stream.readBuffer;
auto it = readBuffer.begin();
auto bufferEndOffset = buffer.offset + buffer.data.chainLength();
folly::Optional<uint64_t> bufferEofOffset;
if (buffer.eof) {
bufferEofOffset = bufferEndOffset;
} else if (buffer.data.chainLength() == 0) {
VLOG(10) << "Empty stream without EOF";
return;
}
if (stream.finalReadOffset && bufferEofOffset &&
*stream.finalReadOffset != *bufferEofOffset) {
throw QuicTransportException(
"Invalid EOF", TransportErrorCode::FINAL_SIZE_ERROR);
} else if (bufferEofOffset) {
// Do some consistency checks on the stream.
if (stream.maxOffsetObserved > *bufferEofOffset) {
throw QuicTransportException(
"EOF in middle of stream", TransportErrorCode::FINAL_SIZE_ERROR);
}
stream.finalReadOffset = bufferEofOffset;
} else if (stream.finalReadOffset) {
// We did not receive a segment with an EOF set.
if (buffer.offset + buffer.data.chainLength() > *stream.finalReadOffset) {
throw QuicTransportException(
"Invalid data after EOF", TransportErrorCode::FINAL_SIZE_ERROR);
}
}
// Update the flow control information before changing max offset observed on
// the stream.
connFlowControlVisitor(stream.maxOffsetObserved, bufferEndOffset);
stream.maxOffsetObserved =
std::max(stream.maxOffsetObserved, bufferEndOffset);
if (buffer.data.chainLength() == 0) {
// Nothing more to do since we already processed the EOF
// case.
return;
}
if (buffer.offset < stream.currentReadOffset) {
// trim the buffer to start from stream read offset.
buffer.data.trimStartAtMost(stream.currentReadOffset - buffer.offset);
buffer.offset = stream.currentReadOffset;
if (buffer.data.chainLength() == 0) {
return;
}
}
// Nothing in the buffer, just append it.
if (it == readBuffer.end()) {
readBuffer.emplace_back(std::move(buffer));
return;
}
// Start overlap will point to the first buffer that overlaps with the
// current buffer and End overlap will point to the last buffer that overlaps.
// They must always be set together.
folly::Optional<std::deque<StreamBuffer>::iterator> startOverlap;
folly::Optional<std::deque<StreamBuffer>::iterator> endOverlap;
StreamBuffer* current = &buffer;
bool currentAlreadyInserted = false;
bool done = false;
it = std::lower_bound(
it,
readBuffer.end(),
current->offset,
[](const StreamBuffer& listValue, uint64_t offset) {
// First element where the end offset is > start offset of the buffer.
return (listValue.offset + listValue.data.chainLength()) < offset;
});
// The invariant we're trying to maintain here is that individual
// elements of the readBuffer are assuredly non contiguous sections
// of the stream.
for (; it != readBuffer.end() && !done; ++it) {
auto currentEnd = current->offset + current->data.chainLength();
auto itEnd = it->offset + it->data.chainLength();
if (current->offset == it->offset && currentEnd == itEnd) {
// Exact overlap. Done.
done = true;
} else if (current->offset >= it->offset && currentEnd <= itEnd) {
// Subset overlap
done = true;
} else if (
current->offset <= it->offset && currentEnd >= it->offset &&
currentEnd <= itEnd) {
// Left overlap. Done.
it->data.trimStartAtMost(currentEnd - it->offset);
if (it->data.chainLength() > 0) {
current->data.append(it->data.move());
}
if (!startOverlap) {
startOverlap = it;
}
endOverlap = it + 1;
done = true;
} else if (current->offset < it->offset && currentEnd < it->offset) {
// Left, no overlap. Done.
if (!startOverlap) {
startOverlap = it;
endOverlap = it;
}
done = true;
} else if (current->offset <= it->offset && currentEnd > it->offset) {
// Complete overlap. Need to move on.
if (!startOverlap) {
startOverlap = it;
}
endOverlap = it + 1;
} else if (
current->offset >= it->offset && current->offset <= itEnd &&
currentEnd > itEnd) {
// Right overlap. Not done.
current->data.trimStartAtMost(itEnd - current->offset);
it->data.append(current->data.move());
current = &(*it);
currentAlreadyInserted = true;
DCHECK(!startOverlap);
startOverlap = it + 1;
endOverlap = it + 1;
}
}
// Could have also been completely to the right of the last element.
if (startOverlap && !currentAlreadyInserted) {
DCHECK(endOverlap);
DCHECK(
*startOverlap != readBuffer.end() || *endOverlap == readBuffer.end());
auto insertIt = readBuffer.erase(*startOverlap, *endOverlap);
readBuffer.emplace(insertIt, std::move(*current));
return;
} else if (currentAlreadyInserted) {
DCHECK(startOverlap);
DCHECK(endOverlap);
DCHECK(
*startOverlap != readBuffer.end() || *endOverlap == readBuffer.end());
readBuffer.erase(*startOverlap, *endOverlap);
return;
}
auto last = readBuffer.end() - 1;
if (current->offset > last->offset + last->data.chainLength()) {
readBuffer.emplace_back(std::move(*current));
}
}
void appendDataToReadBuffer(QuicStreamState& stream, StreamBuffer buffer) {
appendDataToReadBufferCommon(
stream,
std::move(buffer),
[&stream](uint64_t previousMaxOffsetObserved, uint64_t bufferEndOffset) {
updateFlowControlOnStreamData(
stream, previousMaxOffsetObserved, bufferEndOffset);
});
}
void appendDataToReadBuffer(QuicCryptoStream& stream, StreamBuffer buffer) {
appendDataToReadBufferCommon(
stream, std::move(buffer), [](uint64_t, uint64_t) {});
}
std::pair<Buf, bool> readDataInOrderFromReadBuffer(
QuicStreamLike& stream,
uint64_t amount,
bool sinkData) {
auto remaining = amount;
bool eof = false;
Buf data;
while ((amount == 0 || remaining != 0) && !stream.readBuffer.empty()) {
auto curr = stream.readBuffer.begin();
if (curr->offset > stream.currentReadOffset) {
// The buffer is sorted in order of the left edge of the range,
// if we find an item that is beyond the one we needed to read,
// we should quit.
break;
}
size_t currSize = curr->data.chainLength();
// In the algorithm for the append function, we maintain the invariant that
// the individual ranges are non-overlapping, thus if we get to this point,
// we must have an offset which matches the read offset.
CHECK_EQ(curr->offset, stream.currentReadOffset);
uint64_t toRead =
std::min<uint64_t>(currSize, amount == 0 ? currSize : remaining);
std::unique_ptr<folly::IOBuf> splice;
if (sinkData) {
curr->data.trimStart(toRead);
} else {
splice = curr->data.splitAtMost(toRead);
DCHECK_EQ(splice->computeChainDataLength(), toRead);
}
curr->offset += toRead;
if (curr->data.chainLength() == 0) {
eof = curr->eof;
stream.readBuffer.pop_front();
}
if (!sinkData) {
prependToBuf(data, std::move(splice));
}
if (amount != 0) {
remaining -= toRead;
}
stream.currentReadOffset += toRead;
}
return std::make_pair(std::move(data), eof);
}
Buf readDataFromCryptoStream(QuicCryptoStream& stream, uint64_t amount) {
return readDataInOrderFromReadBuffer(stream, amount).first;
}
std::pair<Buf, bool> readDataFromQuicStream(
QuicStreamState& stream,
uint64_t amount) {
auto eof = stream.finalReadOffset &&
stream.currentReadOffset >= *stream.finalReadOffset;
if (eof) {
if (stream.currentReadOffset == *stream.finalReadOffset) {
stream.currentReadOffset += 1;
}
stream.conn.streamManager->updateReadableStreams(stream);
stream.conn.streamManager->updatePeekableStreams(stream);
return std::make_pair(nullptr, true);
}
uint64_t lastReadOffset = stream.currentReadOffset;
Buf data;
std::tie(data, eof) = readDataInOrderFromReadBuffer(stream, amount);
// Update flow control before handling eof as eof is not subject to flow
// control
updateFlowControlOnRead(stream, lastReadOffset, Clock::now());
eof = stream.finalReadOffset &&
stream.currentReadOffset == *stream.finalReadOffset;
if (eof) {
stream.currentReadOffset += 1;
}
stream.conn.streamManager->updateReadableStreams(stream);
stream.conn.streamManager->updatePeekableStreams(stream);
return std::make_pair(std::move(data), eof);
}
void peekDataFromQuicStream(
QuicStreamState& stream,
const folly::Function<void(StreamId id, const folly::Range<PeekIterator>&)
const>& peekCallback) {
if (peekCallback) {
peekCallback(
stream.id,
folly::Range<PeekIterator>(
stream.readBuffer.cbegin(), stream.readBuffer.size()));
}
}
/**
* Same as readDataFromQuicStream(),
* only releases existing data instead of returning it.
*/
void consumeDataFromQuicStream(QuicStreamState& stream, uint64_t amount) {
bool eof = stream.finalReadOffset &&
stream.currentReadOffset >= *stream.finalReadOffset;
if (eof) {
if (stream.currentReadOffset == *stream.finalReadOffset) {
stream.currentReadOffset++;
}
stream.conn.streamManager->updateReadableStreams(stream);
stream.conn.streamManager->updatePeekableStreams(stream);
return;
}
uint64_t lastReadOffset = stream.currentReadOffset;
readDataInOrderFromReadBuffer(stream, amount, true /* sinkData */);
// Update flow control before handling eof as eof is not subject to flow
// control
updateFlowControlOnRead(stream, lastReadOffset, Clock::now());
eof = stream.finalReadOffset &&
stream.currentReadOffset == *stream.finalReadOffset;
if (eof) {
stream.currentReadOffset += 1;
}
stream.conn.streamManager->updateReadableStreams(stream);
stream.conn.streamManager->updatePeekableStreams(stream);
}
bool allBytesTillFinAcked(const QuicStreamState& stream) {
/**
* All bytes are acked if the following conditions are met:
* 1. The app wrote a FIN
* 2. We wrote the fin out to the network
* 3. We have no bytes remaining to retransmit.
* 4. We have no bytes left to write
* 5. We have no bytes that are detected as lost.
*/
return stream.finalWriteOffset &&
stream.currentWriteOffset > *stream.finalWriteOffset &&
stream.retransmissionBuffer.empty() && stream.writeBuffer.empty() &&
stream.lossBuffer.empty();
}
void appendPendingStreamReset(
QuicConnectionStateBase& conn,
const QuicStreamState& stream,
ApplicationErrorCode errorCode) {
/*
* When BufMetas are written to the transport, but before they are written to
* the network, writeBufMeta.offset would be assigned a value >
* currentWriteOffset. For this reason, we can't simply use
* min(max(currentWriteOffset, writeBufMeta.offset), finalWriteOffset) as the
* final offset. We have to check if any BufMetas have been written to the
* network. If we simply use min(max(currentWriteOffset, writeBufMeta.offset),
* we risk using a value > peer's flow control limit.
*/
bool writeBufWritten = stream.writeBufMeta.offset &&
(stream.currentWriteOffset + stream.writeBuffer.chainLength() !=
stream.writeBufMeta.offset);
conn.pendingEvents.resets.emplace(
std::piecewise_construct,
std::forward_as_tuple(stream.id),
std::forward_as_tuple(
stream.id,
errorCode,
std::min(
writeBufWritten ? stream.writeBufMeta.offset
: stream.currentWriteOffset,
stream.finalWriteOffset.value_or(
std::numeric_limits<uint64_t>::max()))));
}
uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream) {
return stream.finalWriteOffset.value_or(
stream.currentWriteOffset + stream.writeBuffer.chainLength());
}
folly::Optional<uint64_t> getLargestWriteOffsetTxed(
const QuicStreamState& stream) {
// currentWriteOffset is really nextWriteOffset
// when 0, it indicates nothing has been written yet
if (stream.currentWriteOffset == 0) {
return folly::none;
}
return stream.currentWriteOffset - 1;
}
folly::Optional<uint64_t> getLargestDeliverableOffset(
const QuicStreamState& stream) {
// If the acked intervals is not empty, then the furthest acked interval
// starting at zero is the next offset. If there is no interval starting at
// zero then we cannot deliver any offsets.
if (stream.ackedIntervals.empty() ||
stream.ackedIntervals.front().start != 0) {
return folly::none;
}
return stream.ackedIntervals.front().end;
}
uint64_t getAckIntervalSetVersion(const QuicStreamState& stream) {
return stream.ackedIntervals.insertVersion();
}
uint64_t getNumPacketsTxWithNewData(const QuicStreamState& stream) {
return stream.numPacketsTxWithNewData;
}
QuicCryptoStream* getCryptoStream(
QuicCryptoState& cryptoState,
EncryptionLevel encryptionLevel) {
switch (encryptionLevel) {
case EncryptionLevel::Initial:
return &cryptoState.initialStream;
case EncryptionLevel::Handshake:
return &cryptoState.handshakeStream;
case EncryptionLevel::EarlyData:
// TODO: remove this when we implement EOED for
// draft-17.
return &cryptoState.handshakeStream;
case EncryptionLevel::AppData:
return &cryptoState.oneRttStream;
default:
LOG(FATAL) << "Unhandled EncryptionLevel";
}
folly::assume_unreachable();
}
void processCryptoStreamAck(
QuicCryptoStream& cryptoStream,
uint64_t offset,
uint64_t len) {
auto ackedBuffer = cryptoStream.retransmissionBuffer.find(offset);
if (ackedBuffer == cryptoStream.retransmissionBuffer.end() ||
ackedBuffer->second->offset != offset ||
ackedBuffer->second->data.chainLength() != len) {
// It's possible retransmissions of crypto data were canceled.
return;
}
cryptoStream.retransmissionBuffer.erase(ackedBuffer);
}
} // namespace quic