in proxygen/lib/http/session/HQSession.cpp [2042:2169]
uint64_t HQSession::requestStreamWriteImpl(HQStreamTransportBase* hqStream,
uint64_t maxEgress,
double ratio) {
CHECK(hqStream->queueHandle_.isStreamTransportEnqueued());
HTTPTransaction::DestructorGuard dg(&hqStream->txn_);
auto streamId = hqStream->getStreamId();
auto flowControl = sock_->getStreamFlowControl(streamId);
if (flowControl.hasError()) {
LOG(ERROR)
<< "Got error=" << flowControl.error() << " streamID=" << streamId
<< " detached=" << hqStream->detached_
<< " readBufLen=" << static_cast<int>(hqStream->readBuf_.chainLength())
<< " writeBufLen=" << static_cast<int>(hqStream->writeBufferSize())
<< " readEOF=" << hqStream->readEOF_
<< " ingressError_=" << static_cast<int>(hqStream->ingressError_)
<< " eomGate_=" << hqStream->eomGate_;
handleWriteError(hqStream, flowControl.error());
return 0;
}
auto streamSendWindow = flowControl->sendWindowAvailable;
size_t canSend = std::min(streamSendWindow, maxEgress);
// we may have already buffered more than the amount the transport can take,
// or the txn may not have any more body bytes/EOM to add. In case, there is
// no need to call txn->onWriteReady.
if (hqStream->wantsOnWriteReady(canSend)) {
// Populate the write buffer by telling the transaction how much
// room is available for body data
size_t maxBodySend = canSend - hqStream->writeBufferSize();
VLOG(4) << __func__ << " asking txn for more bytes sess=" << *this
<< ": streamID=" << streamId << " canSend=" << canSend
<< " remain=" << hqStream->writeBufferSize()
<< " pendingEOM=" << hqStream->pendingEOM_
<< " maxBodySend=" << maxBodySend << " ratio=" << ratio;
hqStream->txn_.onWriteReady(maxBodySend, ratio);
// onWriteReady may not be able to detach any byte from the deferred egress
// body bytes, in case it's getting rate limited.
// In that case the txn will get removed from the egress queue from
// onWriteReady
if (!hqStream->hasWriteBuffer() && !hqStream->pendingEOM_) {
return 0;
}
}
auto bufWritter = [&](quic::StreamId streamId,
std::unique_ptr<folly::IOBuf> data,
bool sendEof,
quic::QuicSocket::DeliveryCallback* deliveryCallback) {
return sock_->writeChain(
streamId, std::move(data), sendEof, deliveryCallback);
};
auto bufMetaWritter =
[&](quic::StreamId streamId,
quic::BufferMeta bufMeta,
bool sendEof,
quic::QuicSocket::DeliveryCallback* deliveryCallback) {
return sock_->writeBufMeta(
streamId, bufMeta, sendEof, deliveryCallback);
};
size_t sent = 0;
auto bufSendLen = std::min(canSend, hqStream->writeBuf_.chainLength());
auto tryWriteBuf = hqStream->writeBuf_.splitAtMost(canSend);
bool sendEof = (hqStream->pendingEOM_ && !hqStream->hasPendingBody());
if (bufSendLen > 0 || sendEof) {
VLOG(4) << __func__ << " before write sess=" << *this
<< ": streamID=" << streamId << " maxEgress=" << maxEgress
<< " sendWindow=" << streamSendWindow
<< " tryToSend=" << tryWriteBuf->computeChainDataLength()
<< " sendEof=" << sendEof;
sent = handleWrite(std::move(bufWritter),
hqStream,
std::move(tryWriteBuf),
bufSendLen,
sendEof);
}
auto bufMetaWriteLen =
std::min(canSend - bufSendLen, hqStream->bufMeta_.length);
auto splitBufMeta = hqStream->bufMeta_.split(bufMetaWriteLen);
// Refresh sendEof after previous write and the bufMEta split.
sendEof = (hqStream->pendingEOM_ && !hqStream->hasPendingBody());
if (sendEof || splitBufMeta.length > 0) {
quic::BufferMeta quicBufMeta(splitBufMeta.length);
sent += handleWrite(std::move(bufMetaWritter),
hqStream,
quicBufMeta,
quicBufMeta.length,
sendEof);
}
VLOG(4) << __func__ << " after write sess=" << *this
<< ": streamID=" << streamId << " sent=" << sent
<< " buflen=" << hqStream->writeBufferSize()
<< " hasPendingBody=" << hqStream->txn_.hasPendingBody()
<< " EOM=" << hqStream->pendingEOM_;
if (infoCallback_) {
infoCallback_->onWrite(*this, sent);
}
CHECK_GE(maxEgress, sent);
bool flowControlBlocked = (sent == streamSendWindow && !sendEof);
if (flowControlBlocked) {
// TODO: this one doesn't create trouble, but it's certainly not logging the
// extra params anyway.
if (sock_ && sock_->getState() && sock_->getState()->qLogger) {
sock_->getState()->qLogger->addStreamStateUpdate(
streamId,
quic::kStreamBlocked,
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - hqStream->createdTime));
}
}
// sendAbort can clear the egress queue, so this stream may no longer be
// enqueued
if (hqStream->queueHandle_.isStreamTransportEnqueued() &&
(!hqStream->hasPendingEgress() || flowControlBlocked)) {
VLOG(4) << "clearPendingEgress for " << hqStream->txn_;
txnEgressQueue_.clearPendingEgress(hqStream->queueHandle_.getHandle());
}
if (flowControlBlocked && !hqStream->txn_.isEgressComplete()) {
VLOG(4) << __func__ << " txn flow control blocked, txn=" << hqStream->txn_;
hqStream->txn_.pauseEgress();
}
return sent;
}