in quic/api/QuicTransportFunctions.cpp [1319:1442]
WriteQuicDataResult writeConnectionDataToSocket(
folly::AsyncUDPSocket& sock,
QuicConnectionStateBase& connection,
const ConnectionId& srcConnId,
const ConnectionId& dstConnId,
HeaderBuilder builder,
PacketNumberSpace pnSpace,
QuicPacketScheduler& scheduler,
const WritableBytesFunc& writableBytesFunc,
uint64_t packetLimit,
const Aead& aead,
const PacketNumberCipher& headerCipher,
QuicVersion version,
TimePoint writeLoopBeginTime,
const std::string& token) {
VLOG(10) << nodeToString(connection.nodeType)
<< " writing data using scheduler=" << scheduler.name() << " "
<< connection;
auto batchWriter = BatchWriterFactory::makeBatchWriter(
sock,
connection.transportSettings.batchingMode,
connection.transportSettings.maxBatchSize,
connection.transportSettings.useThreadLocalBatching,
connection.transportSettings.threadLocalDelay,
connection.transportSettings.dataPathType,
connection);
auto happyEyeballsState = connection.nodeType == QuicNodeType::Server
? nullptr
: &static_cast<QuicClientConnectionState&>(connection).happyEyeballsState;
IOBufQuicBatch ioBufBatch(
std::move(batchWriter),
connection.transportSettings.useThreadLocalBatching,
sock,
connection.peerAddress,
connection.statsCallback,
happyEyeballsState);
if (connection.loopDetectorCallback) {
connection.writeDebugState.schedulerName = scheduler.name().str();
connection.writeDebugState.noWriteReason = NoWriteReason::WRITE_OK;
if (!scheduler.hasData()) {
connection.writeDebugState.noWriteReason = NoWriteReason::EMPTY_SCHEDULER;
}
}
auto batchSize = connection.transportSettings.batchingMode ==
QuicBatchingMode::BATCHING_MODE_NONE
? connection.transportSettings.writeConnectionDataPacketsLimit
: connection.transportSettings.maxBatchSize;
uint64_t bytesWritten = 0;
while (scheduler.hasData() && ioBufBatch.getPktSent() < packetLimit &&
((ioBufBatch.getPktSent() < batchSize) ||
writeLoopTimeLimit(writeLoopBeginTime, connection))) {
auto packetNum = getNextPacketNum(connection, pnSpace);
auto header = builder(srcConnId, dstConnId, packetNum, version, token);
uint32_t writableBytes = folly::to<uint32_t>(std::min<uint64_t>(
connection.udpSendPacketLen, writableBytesFunc(connection)));
uint64_t cipherOverhead = aead.getCipherOverhead();
if (writableBytes < cipherOverhead) {
writableBytes = 0;
} else {
writableBytes -= cipherOverhead;
}
const auto& dataPlainFunc =
connection.transportSettings.dataPathType == DataPathType::ChainedMemory
? iobufChainBasedBuildScheduleEncrypt
: continuousMemoryBuildScheduleEncrypt;
auto ret = dataPlainFunc(
connection,
std::move(header),
pnSpace,
packetNum,
cipherOverhead,
scheduler,
writableBytes,
ioBufBatch,
aead,
headerCipher);
if (!ret.buildSuccess) {
return {ioBufBatch.getPktSent(), 0, bytesWritten};
}
// If we build a packet, we updateConnection(), even if write might have
// been failed. Because if it builds, a lot of states need to be updated no
// matter the write result. We are basically treating this case as if we
// pretend write was also successful but packet is lost somewhere in the
// network.
bytesWritten += ret.encodedSize;
auto& result = ret.result;
updateConnection(
connection,
std::move(result->packetEvent),
std::move(result->packet->packet),
Clock::now(),
folly::to<uint32_t>(ret.encodedSize),
folly::to<uint32_t>(ret.encodedBodySize),
false /* isDSRPacket */);
// if ioBufBatch.write returns false
// it is because a flush() call failed
if (!ret.writeSuccess) {
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason =
NoWriteReason::SOCKET_FAILURE;
}
return {ioBufBatch.getPktSent(), 0, bytesWritten};
}
}
ioBufBatch.flush();
if (connection.transportSettings.dataPathType ==
DataPathType::ContinuousMemory) {
CHECK(connection.bufAccessor->ownsBuffer());
auto buf = connection.bufAccessor->obtain();
CHECK(buf->length() == 0 && buf->headroom() == 0);
connection.bufAccessor->release(std::move(buf));
}
return {ioBufBatch.getPktSent(), 0, bytesWritten};
}