quic/state/QuicStreamManager.cpp (540 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/state/QuicStreamManager.h>
#include <quic/state/QuicStreamUtilities.h>
#include <quic/state/StateData.h>
namespace quic {
/**
* Updates the head of line blocked time for the stream. This should be called
* on new data received or even data being read from the stream.
* There are 2 cases when you can become head of line blocked:
* 1. You're not previously holb. You receive new data which cannot be read.
* 2. You are not head of line blocked. You read data from the stream, but you
* discover a hole.
*
* You can become not head of line blocked if the following conditions happen:
* 1. You were head of line blocked, and you receive something that allows you
* to read from the stream.
* 2. You were head of line blocked, but you receive a reset from the peer.
*/
static void updateHolBlockedTime(QuicStreamState& stream) {
// No data has arrived, or the current stream offset matches
// the stream offset that has been read so far. Stream is not HOL-blocked
// (although may be blocked on missing data).
// If there is no more data to read, or if the current read offset
// matches the read offset in the front queue, a potential HOL block
// becomes unblocked.
if (stream.readBuffer.empty() ||
(stream.currentReadOffset == stream.readBuffer.front().offset)) {
// If we were previously HOL blocked, we're not any more.
// Update the total HOLB time and reset the latch.
if (stream.lastHolbTime) {
stream.totalHolbTime +=
std::chrono::duration_cast<std::chrono::microseconds>(
Clock::now() - *stream.lastHolbTime);
stream.lastHolbTime = folly::none;
}
return;
}
// No HOL unblocking event has occured. If we are already HOL bloked,
// we remain HOL blocked.
if (stream.lastHolbTime) {
return;
}
// If we were previously not HOL blocked, we are now.
stream.lastHolbTime = Clock::now();
stream.holbCount++;
}
static bool isStreamUnopened(
StreamId streamId,
StreamId nextAcceptableStreamId) {
return streamId >= nextAcceptableStreamId;
}
// If a stream is un-opened, these automatically creates all lower streams.
// Returns false if the stream is closed or already opened.
static LocalErrorCode openPeerStreamIfNotClosed(
StreamId streamId,
folly::F14FastSet<StreamId>& openStreams,
StreamId& nextAcceptableStreamId,
StreamId maxStreamId,
std::vector<StreamId>& newStreams) {
if (streamId < nextAcceptableStreamId) {
return LocalErrorCode::CREATING_EXISTING_STREAM;
}
if (streamId >= maxStreamId) {
return LocalErrorCode::STREAM_LIMIT_EXCEEDED;
}
StreamId start = nextAcceptableStreamId;
auto numNewStreams = (streamId - start) / detail::kStreamIncrement;
openStreams.reserve(openStreams.size() + numNewStreams);
newStreams.reserve(newStreams.size() + numNewStreams);
while (start <= streamId) {
openStreams.emplace(start);
newStreams.push_back(start);
start += detail::kStreamIncrement;
}
if (streamId >= nextAcceptableStreamId) {
nextAcceptableStreamId = streamId + detail::kStreamIncrement;
}
return LocalErrorCode::NO_ERROR;
}
static LocalErrorCode openLocalStreamIfNotClosed(
StreamId streamId,
folly::F14FastSet<StreamId>& openStreams,
StreamId& nextAcceptableStreamId,
StreamId maxStreamId) {
if (streamId < nextAcceptableStreamId) {
return LocalErrorCode::CREATING_EXISTING_STREAM;
}
if (streamId >= maxStreamId) {
return LocalErrorCode::STREAM_LIMIT_EXCEEDED;
}
StreamId start = nextAcceptableStreamId;
auto numNewStreams = (streamId - start) / detail::kStreamIncrement;
openStreams.reserve(openStreams.size() + numNewStreams);
while (start <= streamId) {
openStreams.emplace(start);
start += detail::kStreamIncrement;
}
if (streamId >= nextAcceptableStreamId) {
nextAcceptableStreamId = streamId + detail::kStreamIncrement;
}
return LocalErrorCode::NO_ERROR;
}
bool QuicStreamManager::streamExists(StreamId streamId) {
if (isLocalStream(nodeType_, streamId)) {
if (isUnidirectionalStream(streamId)) {
return openUnidirectionalLocalStreams_.count(streamId) > 0;
} else {
return openBidirectionalLocalStreams_.count(streamId) > 0;
}
} else {
if (isUnidirectionalStream(streamId)) {
return openUnidirectionalPeerStreams_.count(streamId) > 0;
} else {
return openBidirectionalPeerStreams_.count(streamId) > 0;
}
}
}
QuicStreamState* QuicStreamManager::findStream(StreamId streamId) {
auto lookup = streams_.find(streamId);
if (lookup == streams_.end()) {
return nullptr;
} else {
return &lookup->second;
}
}
void QuicStreamManager::setMaxLocalBidirectionalStreams(
uint64_t maxStreams,
bool force) {
if (maxStreams > kMaxMaxStreams) {
throw QuicTransportException(
"Attempt to set maxStreams beyond the max allowed.",
TransportErrorCode::STREAM_LIMIT_ERROR);
}
StreamId maxStreamId = maxStreams * detail::kStreamIncrement +
initialLocalBidirectionalStreamId_;
if (force || maxStreamId > maxLocalBidirectionalStreamId_) {
maxLocalBidirectionalStreamId_ = maxStreamId;
maxLocalBidirectionalStreamIdIncreased_ = true;
}
}
void QuicStreamManager::setMaxLocalUnidirectionalStreams(
uint64_t maxStreams,
bool force) {
if (maxStreams > kMaxMaxStreams) {
throw QuicTransportException(
"Attempt to set maxStreams beyond the max allowed.",
TransportErrorCode::STREAM_LIMIT_ERROR);
}
StreamId maxStreamId = maxStreams * detail::kStreamIncrement +
initialLocalUnidirectionalStreamId_;
if (force || maxStreamId > maxLocalUnidirectionalStreamId_) {
maxLocalUnidirectionalStreamId_ = maxStreamId;
maxLocalUnidirectionalStreamIdIncreased_ = true;
}
}
void QuicStreamManager::setMaxRemoteBidirectionalStreams(uint64_t maxStreams) {
setMaxRemoteBidirectionalStreamsInternal(maxStreams, false);
}
void QuicStreamManager::setMaxRemoteUnidirectionalStreams(uint64_t maxStreams) {
setMaxRemoteUnidirectionalStreamsInternal(maxStreams, false);
}
void QuicStreamManager::setMaxRemoteBidirectionalStreamsInternal(
uint64_t maxStreams,
bool force) {
if (maxStreams > kMaxMaxStreams) {
throw QuicTransportException(
"Attempt to set maxStreams beyond the max allowed.",
TransportErrorCode::STREAM_LIMIT_ERROR);
}
StreamId maxStreamId = maxStreams * detail::kStreamIncrement +
initialRemoteBidirectionalStreamId_;
if (force || maxStreamId > maxRemoteBidirectionalStreamId_) {
maxRemoteBidirectionalStreamId_ = maxStreamId;
}
}
void QuicStreamManager::setMaxRemoteUnidirectionalStreamsInternal(
uint64_t maxStreams,
bool force) {
if (maxStreams > kMaxMaxStreams) {
throw QuicTransportException(
"Attempt to set maxStreams beyond the max allowed.",
TransportErrorCode::STREAM_LIMIT_ERROR);
}
StreamId maxStreamId = maxStreams * detail::kStreamIncrement +
initialRemoteUnidirectionalStreamId_;
if (force || maxStreamId > maxRemoteUnidirectionalStreamId_) {
maxRemoteUnidirectionalStreamId_ = maxStreamId;
}
}
bool QuicStreamManager::consumeMaxLocalBidirectionalStreamIdIncreased() {
bool res = maxLocalBidirectionalStreamIdIncreased_;
maxLocalBidirectionalStreamIdIncreased_ = false;
return res;
}
bool QuicStreamManager::consumeMaxLocalUnidirectionalStreamIdIncreased() {
bool res = maxLocalUnidirectionalStreamIdIncreased_;
maxLocalUnidirectionalStreamIdIncreased_ = false;
return res;
}
bool QuicStreamManager::setStreamPriority(
StreamId id,
PriorityLevel level,
bool incremental) {
auto stream = findStream(id);
if (stream) {
Priority newPriority(level, incremental);
if (stream->priority == newPriority) {
return false;
}
stream->priority = newPriority;
if (!stream->isControl) {
auto priorityMapEntry = streamPriorityLevelsNoCtrl_.find(id);
if (priorityMapEntry == streamPriorityLevelsNoCtrl_.end()) {
throw QuicTransportException(
"Active stream not in stream priority map",
TransportErrorCode::STREAM_STATE_ERROR);
} else {
priorityMapEntry->second = newPriority.level;
}
notifyStreamPriorityChanges();
}
// If this stream is already in the writable or loss queus, update the
// priority there.
writableStreams_.updateIfExist(id, stream->priority);
writableDSRStreams_.updateIfExist(id, stream->priority);
return true;
}
return false;
}
void QuicStreamManager::refreshTransportSettings(
const TransportSettings& settings) {
transportSettings_ = &settings;
setMaxRemoteBidirectionalStreamsInternal(
transportSettings_->advertisedInitialMaxStreamsBidi, true);
setMaxRemoteUnidirectionalStreamsInternal(
transportSettings_->advertisedInitialMaxStreamsUni, true);
}
// We create local streams lazily. If a local stream was created
// but not allocated yet, this will allocate a stream.
// This will return nullptr if a stream is closed or un-opened.
QuicStreamState* FOLLY_NULLABLE
QuicStreamManager::getOrCreateOpenedLocalStream(StreamId streamId) {
auto& openLocalStreams = isUnidirectionalStream(streamId)
? openUnidirectionalLocalStreams_
: openBidirectionalLocalStreams_;
if (openLocalStreams.count(streamId)) {
// Open a lazily created stream.
auto it = streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
if (!it.second) {
throw QuicTransportException(
"Creating an active stream", TransportErrorCode::STREAM_STATE_ERROR);
}
addToStreamPriorityMap(it.first->second);
return &it.first->second;
}
return nullptr;
}
QuicStreamState* QuicStreamManager::getStream(StreamId streamId) {
if (isRemoteStream(nodeType_, streamId)) {
auto stream = getOrCreatePeerStream(streamId);
updateAppIdleState();
return stream;
}
auto it = streams_.find(streamId);
if (it != streams_.end()) {
return &it->second;
}
auto stream = getOrCreateOpenedLocalStream(streamId);
auto nextAcceptableStreamId = isUnidirectionalStream(streamId)
? nextAcceptableLocalUnidirectionalStreamId_
: nextAcceptableLocalBidirectionalStreamId_;
if (!stream && isStreamUnopened(streamId, nextAcceptableStreamId)) {
throw QuicTransportException(
"Trying to get unopened local stream",
TransportErrorCode::STREAM_STATE_ERROR);
}
updateAppIdleState();
return stream;
}
folly::Expected<QuicStreamState*, LocalErrorCode>
QuicStreamManager::createNextBidirectionalStream() {
auto stream = createStream(nextBidirectionalStreamId_);
if (stream.hasValue()) {
nextBidirectionalStreamId_ += detail::kStreamIncrement;
}
return stream;
}
folly::Expected<QuicStreamState*, LocalErrorCode>
QuicStreamManager::createNextUnidirectionalStream() {
auto stream = createStream(nextUnidirectionalStreamId_);
if (stream.hasValue()) {
nextUnidirectionalStreamId_ += detail::kStreamIncrement;
}
return stream;
}
QuicStreamState* FOLLY_NULLABLE
QuicStreamManager::getOrCreatePeerStream(StreamId streamId) {
// This function maintains 3 invariants:
// 1. Streams below nextAcceptableStreamId are streams that have been
// seen before. Everything above can be opened.
// 2. Streams that have been seen before, always have an entry in
// openPeerStreams. If a stream below nextAcceptableStreamId does not
// have an entry in openPeerStreams, then it is closed.
// 3. If streamId n is open all streams < n will be seen.
// It also tries to create the entire state for a stream in a lazy manner.
// Validate the stream id is correct
if (nodeType_ == QuicNodeType::Client && isClientStream(streamId)) {
throw QuicTransportException(
"Attempted getting client peer stream on client",
TransportErrorCode::STREAM_STATE_ERROR);
} else if (nodeType_ == QuicNodeType::Server && isServerStream(streamId)) {
throw QuicTransportException(
"Attempted getting server peer stream on server",
TransportErrorCode::STREAM_STATE_ERROR);
} else if (!isClientStream(streamId) && !isServerStream(streamId)) {
throw QuicTransportException(
"Invalid stream", TransportErrorCode::STREAM_STATE_ERROR);
}
// TODO when we can rely on C++17, this is a good candidate for try_emplace.
auto peerStream = streams_.find(streamId);
if (peerStream != streams_.end()) {
return &peerStream->second;
}
auto& openPeerStreams = isUnidirectionalStream(streamId)
? openUnidirectionalPeerStreams_
: openBidirectionalPeerStreams_;
if (openPeerStreams.count(streamId)) {
// Stream was already open, create the state for it lazily.
auto it = streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
return &it.first->second;
}
auto& nextAcceptableStreamId = isUnidirectionalStream(streamId)
? nextAcceptablePeerUnidirectionalStreamId_
: nextAcceptablePeerBidirectionalStreamId_;
auto maxStreamId = isUnidirectionalStream(streamId)
? maxRemoteUnidirectionalStreamId_
: maxRemoteBidirectionalStreamId_;
auto openedResult = openPeerStreamIfNotClosed(
streamId,
openPeerStreams,
nextAcceptableStreamId,
maxStreamId,
newPeerStreams_);
if (openedResult == LocalErrorCode::CREATING_EXISTING_STREAM) {
// Stream could be closed here.
return nullptr;
} else if (openedResult == LocalErrorCode::STREAM_LIMIT_EXCEEDED) {
throw QuicTransportException(
"Exceeded stream limit.", TransportErrorCode::STREAM_LIMIT_ERROR);
}
auto it = streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
return &it.first->second;
}
folly::Expected<QuicStreamState*, LocalErrorCode>
QuicStreamManager::createStream(StreamId streamId) {
if (nodeType_ == QuicNodeType::Client && !isClientStream(streamId)) {
throw QuicTransportException(
"Attempted creating non-client stream on client",
TransportErrorCode::STREAM_STATE_ERROR);
} else if (nodeType_ == QuicNodeType::Server && !isServerStream(streamId)) {
throw QuicTransportException(
"Attempted creating non-server stream on server",
TransportErrorCode::STREAM_STATE_ERROR);
}
auto existingStream = getOrCreateOpenedLocalStream(streamId);
if (existingStream) {
return existingStream;
}
bool isUni = isUnidirectionalStream(streamId);
auto& nextAcceptableStreamId = isUni
? nextAcceptableLocalUnidirectionalStreamId_
: nextAcceptableLocalBidirectionalStreamId_;
auto maxStreamId =
isUni ? maxLocalUnidirectionalStreamId_ : maxLocalBidirectionalStreamId_;
auto& openLocalStreams =
isUni ? openUnidirectionalLocalStreams_ : openBidirectionalLocalStreams_;
auto openedResult = openLocalStreamIfNotClosed(
streamId, openLocalStreams, nextAcceptableStreamId, maxStreamId);
if (openedResult != LocalErrorCode::NO_ERROR) {
return folly::makeUnexpected(openedResult);
}
auto it = streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
updateAppIdleState();
return &it.first->second;
}
void QuicStreamManager::removeClosedStream(StreamId streamId) {
auto it = streams_.find(streamId);
if (it == streams_.end()) {
VLOG(10) << "Trying to remove already closed stream=" << streamId;
return;
}
VLOG(10) << "Removing closed stream=" << streamId;
DCHECK(it->second.inTerminalStates());
readableStreams_.erase(streamId);
peekableStreams_.erase(streamId);
writableStreams_.erase(streamId);
writableDSRStreams_.erase(streamId);
writableControlStreams_.erase(streamId);
removeLoss(streamId);
blockedStreams_.erase(streamId);
deliverableStreams_.erase(streamId);
txStreams_.erase(streamId);
windowUpdates_.erase(streamId);
stopSendingStreams_.erase(streamId);
flowControlUpdated_.erase(streamId);
if (!it->second.isControl) {
const auto streamPriorityIt = streamPriorityLevelsNoCtrl_.find(streamId);
if (streamPriorityIt == streamPriorityLevelsNoCtrl_.end()) {
throw QuicTransportException(
"Removed stream is not in the priority map",
TransportErrorCode::STREAM_STATE_ERROR);
}
streamPriorityLevelsNoCtrl_.erase(streamPriorityIt);
}
if (it->second.isControl) {
DCHECK_GT(numControlStreams_, 0);
numControlStreams_--;
}
streams_.erase(it);
QUIC_STATS(conn_.statsCallback, onQuicStreamClosed);
if (isRemoteStream(nodeType_, streamId)) {
auto& openPeerStreams = isUnidirectionalStream(streamId)
? openUnidirectionalPeerStreams_
: openBidirectionalPeerStreams_;
openPeerStreams.erase(streamId);
// Check if we should send a stream limit update. We need to send an
// update every time we've closed a number of streams >= the set windowing
// fraction.
uint64_t initialStreamLimit = isUnidirectionalStream(streamId)
? transportSettings_->advertisedInitialMaxStreamsUni
: transportSettings_->advertisedInitialMaxStreamsBidi;
uint64_t streamWindow = initialStreamLimit / streamLimitWindowingFraction_;
uint64_t openableRemoteStreams = isUnidirectionalStream(streamId)
? openableRemoteUnidirectionalStreams()
: openableRemoteBidirectionalStreams();
// The "credit" here is how much available stream space we have based on
// what the initial stream limit was set to.
uint64_t streamCredit =
initialStreamLimit - openableRemoteStreams - openPeerStreams.size();
if (streamCredit >= streamWindow) {
if (isUnidirectionalStream(streamId)) {
uint64_t maxStreams = (maxRemoteUnidirectionalStreamId_ -
initialRemoteUnidirectionalStreamId_) /
detail::kStreamIncrement;
setMaxRemoteUnidirectionalStreams(maxStreams + streamCredit);
remoteUnidirectionalStreamLimitUpdate_ = maxStreams + streamCredit;
} else {
uint64_t maxStreams = (maxRemoteBidirectionalStreamId_ -
initialRemoteBidirectionalStreamId_) /
detail::kStreamIncrement;
setMaxRemoteBidirectionalStreams(maxStreams + streamCredit);
remoteBidirectionalStreamLimitUpdate_ = maxStreams + streamCredit;
}
}
} else {
auto& openLocalStreams = isUnidirectionalStream(streamId)
? openUnidirectionalLocalStreams_
: openBidirectionalLocalStreams_;
openLocalStreams.erase(streamId);
}
updateAppIdleState();
notifyStreamPriorityChanges();
}
void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) {
updateHolBlockedTime(stream);
if (stream.hasReadableData() || stream.streamReadError.has_value()) {
readableStreams_.emplace(stream.id);
} else {
readableStreams_.erase(stream.id);
}
}
void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
if (stream.streamWriteError.has_value()) {
CHECK(stream.lossBuffer.empty());
CHECK(stream.lossBufMetas.empty());
removeWritable(stream);
removeDSRWritable(stream);
return;
}
if (stream.hasWritableData() || !stream.lossBuffer.empty()) {
addWritable(stream);
} else {
removeWritable(stream);
}
if (stream.isControl) {
return;
}
if (stream.dsrSender &&
(stream.hasWritableBufMeta() || !stream.lossBufMetas.empty())) {
addDSRWritable(stream);
} else {
removeDSRWritable(stream);
}
}
void QuicStreamManager::updatePeekableStreams(QuicStreamState& stream) {
// In the PeekCallback, the API peekError() is added, so change the condition
// and allow streamReadError in the peekableStreams
if (stream.hasPeekableData() || stream.streamReadError.has_value()) {
peekableStreams_.emplace(stream.id);
} else {
peekableStreams_.erase(stream.id);
}
}
void QuicStreamManager::updateAppIdleState() {
bool currentNonCtrlStreams = hasNonCtrlStreams();
if (isAppIdle_ && !currentNonCtrlStreams) {
// We were app limited, and we continue to be app limited.
return;
} else if (!isAppIdle_ && currentNonCtrlStreams) {
// We were not app limited, and we continue to be not app limited.
return;
}
isAppIdle_ = !currentNonCtrlStreams;
if (conn_.congestionController) {
conn_.congestionController->setAppIdle(isAppIdle_, Clock::now());
}
}
void QuicStreamManager::setStreamAsControl(QuicStreamState& stream) {
if (!stream.isControl) {
stream.isControl = true;
numControlStreams_++;
streamPriorityLevelsNoCtrl_.erase(stream.id);
}
updateAppIdleState();
}
bool QuicStreamManager::isAppIdle() const {
return isAppIdle_;
}
PriorityLevel QuicStreamManager::getHighestPriorityLevel() const {
// Highest priority is minimum value
auto min = kDefaultMaxPriority;
for (auto& entry : streamPriorityLevelsNoCtrl_) {
if (entry.second < min) {
min = entry.second;
}
if (min == 0) {
break;
}
}
return min;
}
void QuicStreamManager::setPriorityChangesObserver(
QuicStreamPrioritiesObserver* observer) {
priorityChangesObserver_ = observer;
}
void QuicStreamManager::resetPriorityChangesObserver() {
if (!priorityChangesObserver_) {
return;
}
priorityChangesObserver_ = nullptr;
}
void QuicStreamManager::notifyStreamPriorityChanges() {
if (priorityChangesObserver_) {
priorityChangesObserver_->onStreamPrioritiesChange();
}
}
void QuicStreamManager::addToStreamPriorityMap(
const QuicStreamState& streamState) {
if (streamState.isControl) {
return;
}
auto entry = streamPriorityLevelsNoCtrl_.emplace(
streamState.id, PriorityLevel(streamState.priority.level));
// Verify stream didn't already exist in streamPriorityLevelsNoCtrl_
if (!entry.second) {
throw QuicTransportException(
"Attempted to add stream already in priority map",
TransportErrorCode::STREAM_STATE_ERROR);
}
// Verify inserted item
if (entry.first->second != PriorityLevel(streamState.priority.level)) {
throw QuicTransportException(
"Failed to add stream to priority map",
TransportErrorCode::STREAM_STATE_ERROR);
}
// Notify observer (if set)
notifyStreamPriorityChanges();
}
} // namespace quic