in quic/dsr/frontend/Scheduler.cpp [30:111]
DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream(
DSRPacketBuilderBase& builder) {
SchedulingResult result;
auto& writableDSRStreams = conn_.streamManager->writableDSRStreams();
const auto& levelIter = std::find_if(
writableDSRStreams.levels.cbegin(),
writableDSRStreams.levels.cend(),
[&](const auto& level) { return !level.streams.empty(); });
if (levelIter == writableDSRStreams.levels.cend()) {
return result;
}
auto streamId = levelIter->streams.cbegin();
auto stream = conn_.streamManager->findStream(*streamId);
CHECK(stream);
CHECK(stream->dsrSender);
result.sender = stream->dsrSender.get();
bool hasFreshBufMeta = stream->writeBufMeta.length > 0;
bool hasLossBufMeta = !stream->lossBufMetas.empty();
CHECK(hasFreshBufMeta || hasLossBufMeta);
if (hasLossBufMeta) {
SendInstruction::Builder instructionBuilder(conn_, *streamId);
auto encodedSize = writeDSRStreamFrame(
builder,
instructionBuilder,
*streamId,
stream->lossBufMetas.front().offset,
stream->lossBufMetas.front().length,
stream->lossBufMetas.front()
.length, // flowControlLen shouldn't be used to limit loss write
stream->lossBufMetas.front().eof,
stream->currentWriteOffset + stream->writeBuffer.chainLength());
if (encodedSize > 0) {
if (builder.remainingSpace() < encodedSize) {
return result;
}
enrichInstruction(instructionBuilder);
builder.addSendInstruction(instructionBuilder.build(), encodedSize);
result.writeSuccess = true;
return result;
}
}
if (!hasFreshBufMeta || builder.remainingSpace() == 0) {
return result;
}
// If we have fresh BufMeta to write, the offset cannot be 0. This is based on
// the current limit that some real data has to be written into the stream
// before BufMetas.
CHECK_NE(stream->writeBufMeta.offset, 0);
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
if (connWritableBytes == 0) {
return result;
}
// When stream still has writeBuffer, getSendStreamFlowControlBytesWire counts
// from currentWriteOffset which isn't right for BufMetas.
auto streamFlowControlLen = std::min(
getSendStreamFlowControlBytesWire(*stream),
stream->flowControlState.peerAdvertisedMaxOffset -
stream->writeBufMeta.offset);
auto flowControlLen = std::min(streamFlowControlLen, connWritableBytes);
bool canWriteFin = stream->finalWriteOffset.has_value() &&
stream->writeBufMeta.length <= flowControlLen;
SendInstruction::Builder instructionBuilder(conn_, *streamId);
auto encodedSize = writeDSRStreamFrame(
builder,
instructionBuilder,
*streamId,
stream->writeBufMeta.offset,
stream->writeBufMeta.length,
flowControlLen,
canWriteFin,
stream->currentWriteOffset + stream->writeBuffer.chainLength());
if (encodedSize > 0) {
if (builder.remainingSpace() < encodedSize) {
return result;
}
enrichInstruction(instructionBuilder);
builder.addSendInstruction(instructionBuilder.build(), encodedSize);
result.writeSuccess = true;
return result;
}
return result;
}