in proxygen/lib/http/session/HTTPSession.cpp [2152:2245]
void HTTPSession::runLoopCallback() noexcept {
// We schedule this callback to run at the end of an event
// loop iteration if either of two conditions has happened:
// * The session has generated some egress data (see scheduleWrite())
// * Reads have become unpaused (see resumeReads())
DestructorGuard dg(this);
inLoopCallback_ = true;
auto scopeg = folly::makeGuard([this] {
inLoopCallback_ = false;
// This ScopeGuard needs to be under the above DestructorGuard
updatePendingWrites();
if (!hasMoreWrites()) {
invokeOnAllTransactions([](HTTPTransaction* txn) {
txn->checkIfEgressRateLimitedByUpstream();
});
}
checkForShutdown();
});
VLOG(5) << *this << " in loop callback";
for (uint32_t i = 0; i < kMaxWritesPerLoop; ++i) {
bodyBytesPerWriteBuf_ = 0;
if (isPrioritySampled()) {
invokeOnAllTransactions([this](HTTPTransaction* txn) {
txn->updateContentionsCount(txnEgressQueue_.numPendingEgress());
});
}
bool cork = true;
bool timestampTx = false;
bool timestampAck = false;
unique_ptr<IOBuf> writeBuf =
getNextToSend(&cork, ×tampTx, ×tampAck);
if (!writeBuf) {
break;
}
uint64_t len = writeBuf->computeChainDataLength();
VLOG(11) << *this << " bytes of egress to be written: " << len
<< " cork:" << cork << " timestampTx:" << timestampTx
<< " timestampAck:" << timestampAck;
if (len == 0) {
checkForShutdown();
return;
}
if (isPrioritySampled()) {
invokeOnAllTransactions([this](HTTPTransaction* txn) {
txn->updateSessionBytesSheduled(bodyBytesPerWriteBuf_);
});
}
folly::WriteFlags flags = folly::WriteFlags::NONE;
flags |= (cork) ? folly::WriteFlags::CORK : folly::WriteFlags::NONE;
flags |= (timestampTx) ? folly::WriteFlags::TIMESTAMP_TX
: folly::WriteFlags::NONE;
flags |= (timestampAck) ? folly::WriteFlags::EOR : folly::WriteFlags::NONE;
CHECK(!pendingWrite_.hasValue());
pendingWrite_.emplace(len, DestructorGuard(this));
if (!writeTimeout_.isScheduled()) {
// Any performance concern here?
wheelTimer_.scheduleTimeout(&writeTimeout_);
}
numActiveWrites_++;
VLOG(4) << *this << " writing " << len
<< ", activeWrites=" << numActiveWrites_ << " cork:" << cork
<< " timestampTx:" << timestampTx
<< " timestampAck:" << timestampAck;
bytesScheduled_ += len;
sock_->writeChain(this, std::move(writeBuf), flags);
if (numActiveWrites_ > 0) {
updateWriteCount();
HTTPSessionBase::notifyEgressBodyBuffered(len, false);
// updateWriteBufSize called in scope guard
break;
}
// writeChain can result in a writeError and trigger the shutdown code path
}
if (numActiveWrites_ == 0 && !writesShutdown() && hasMoreWrites() &&
(!connFlowControl_ || connFlowControl_->getAvailableSend())) {
scheduleWrite();
}
if (readsUnpaused()) {
processReadData();
// Install the read callback if necessary
if (readsUnpaused() && !sock_->getReadCallback()) {
sock_->setReadCB(this);
}
}
// checkForShutdown is now in ScopeGuard
}