quic/dsr/frontend/Scheduler.cpp (96 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/dsr/frontend/Scheduler.h>
#include <quic/dsr/frontend/WriteCodec.h>
#include <quic/flowcontrol/QuicFlowController.h>
#include <quic/state/QuicStateFunctions.h>
namespace quic {
DSRStreamFrameScheduler::DSRStreamFrameScheduler(
QuicServerConnectionState& conn)
: conn_(conn) {}
bool DSRStreamFrameScheduler::hasPendingData() const {
return conn_.streamManager->hasDSRWritable() &&
getSendConnFlowControlBytesWire(conn_) > 0;
}
/**
* Note the difference between this and the regular StreamFrameScheduler.
* There is no current way of knowing if two streams can be DSR-ed from the
* same backend. Thus one SendInstruction can only have one stream. So this API
* only write a single stream.
*/
DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream(
DSRPacketBuilderBase& builder) {
SchedulingResult result;
auto& writableDSRStreams = conn_.streamManager->writableDSRStreams();
const auto& levelIter = std::find_if(
writableDSRStreams.levels.cbegin(),
writableDSRStreams.levels.cend(),
[&](const auto& level) { return !level.streams.empty(); });
if (levelIter == writableDSRStreams.levels.cend()) {
return result;
}
auto streamId = levelIter->streams.cbegin();
auto stream = conn_.streamManager->findStream(*streamId);
CHECK(stream);
CHECK(stream->dsrSender);
result.sender = stream->dsrSender.get();
bool hasFreshBufMeta = stream->writeBufMeta.length > 0;
bool hasLossBufMeta = !stream->lossBufMetas.empty();
CHECK(hasFreshBufMeta || hasLossBufMeta);
if (hasLossBufMeta) {
SendInstruction::Builder instructionBuilder(conn_, *streamId);
auto encodedSize = writeDSRStreamFrame(
builder,
instructionBuilder,
*streamId,
stream->lossBufMetas.front().offset,
stream->lossBufMetas.front().length,
stream->lossBufMetas.front()
.length, // flowControlLen shouldn't be used to limit loss write
stream->lossBufMetas.front().eof,
stream->currentWriteOffset + stream->writeBuffer.chainLength());
if (encodedSize > 0) {
if (builder.remainingSpace() < encodedSize) {
return result;
}
enrichInstruction(instructionBuilder);
builder.addSendInstruction(instructionBuilder.build(), encodedSize);
result.writeSuccess = true;
return result;
}
}
if (!hasFreshBufMeta || builder.remainingSpace() == 0) {
return result;
}
// If we have fresh BufMeta to write, the offset cannot be 0. This is based on
// the current limit that some real data has to be written into the stream
// before BufMetas.
CHECK_NE(stream->writeBufMeta.offset, 0);
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
if (connWritableBytes == 0) {
return result;
}
// When stream still has writeBuffer, getSendStreamFlowControlBytesWire counts
// from currentWriteOffset which isn't right for BufMetas.
auto streamFlowControlLen = std::min(
getSendStreamFlowControlBytesWire(*stream),
stream->flowControlState.peerAdvertisedMaxOffset -
stream->writeBufMeta.offset);
auto flowControlLen = std::min(streamFlowControlLen, connWritableBytes);
bool canWriteFin = stream->finalWriteOffset.has_value() &&
stream->writeBufMeta.length <= flowControlLen;
SendInstruction::Builder instructionBuilder(conn_, *streamId);
auto encodedSize = writeDSRStreamFrame(
builder,
instructionBuilder,
*streamId,
stream->writeBufMeta.offset,
stream->writeBufMeta.length,
flowControlLen,
canWriteFin,
stream->currentWriteOffset + stream->writeBuffer.chainLength());
if (encodedSize > 0) {
if (builder.remainingSpace() < encodedSize) {
return result;
}
enrichInstruction(instructionBuilder);
builder.addSendInstruction(instructionBuilder.build(), encodedSize);
result.writeSuccess = true;
return result;
}
return result;
}
void DSRStreamFrameScheduler::enrichInstruction(
SendInstruction::Builder& builder) {
builder.setPacketNum(getNextPacketNum(conn_, PacketNumberSpace::AppData))
.setLargestAckedPacketNum(getAckState(conn_, PacketNumberSpace::AppData)
.largestAckedByPeer.value_or(0));
}
} // namespace quic