in proxygen/lib/http/session/HTTPSession.cpp [1536:1607]
void HTTPSession::sendHeaders(HTTPTransaction* txn,
const HTTPMessage& headers,
HTTPHeaderSize* size,
bool includeEOM) noexcept {
CHECK(started_);
unique_ptr<IOBuf> goawayBuf;
if (draining_ && isUpstream() && codec_->isReusable() &&
allTransactionsStarted()) {
// For HTTP/1.1, add Connection: close
// For SPDY/H2, save the goaway for AFTER the request
auto writeBuf = writeBuf_.move();
drainImpl();
goawayBuf = writeBuf_.move();
writeBuf_.append(std::move(writeBuf));
}
if (isUpstream() || (txn->isPushed() && headers.isRequest())) {
// upstream picks priority
if (getHTTP2PrioritiesEnabled()) {
auto pri = getMessagePriority(&headers);
if (pri.streamDependency == txn->getID()) {
LOG(ERROR) << "Attempted to create circular dependency txn=" << *this;
} else {
txn->onPriorityUpdate(pri);
}
}
}
const bool wasReusable = codec_->isReusable();
const uint64_t oldOffset = sessionByteOffset();
auto exAttributes = txn->getExAttributes();
auto assocStream = txn->getAssocTxnId();
if (exAttributes) {
codec_->generateExHeader(
writeBuf_, txn->getID(), headers, *exAttributes, includeEOM, size);
} else if (headers.isRequest() && assocStream) {
// Only PUSH_PROMISE (not push response) has an associated stream
codec_->generatePushPromise(
writeBuf_, txn->getID(), headers, *assocStream, includeEOM, size);
} else {
codec_->generateHeader(writeBuf_, txn->getID(), headers, includeEOM, size);
}
const uint64_t newOffset = sessionByteOffset();
// for push response count towards the MAX_CONCURRENT_STREAMS limit
if (isDownstream() && headers.isResponse() && txn->isPushed()) {
incrementOutgoingStreams(txn);
}
// For all upstream headers, addFirstHeaderByteEvent should be added
// For all downstream, only response headers need addFirstHeaderByteEvent
bool shouldAddFirstHeaderByteEvent =
isUpstream() || (isDownstream() && headers.isResponse());
if (shouldAddFirstHeaderByteEvent && newOffset > oldOffset &&
!txn->testAndSetFirstHeaderByteSent() && byteEventTracker_) {
byteEventTracker_->addFirstHeaderByteEvent(newOffset, txn);
}
if (size) {
VLOG(4) << *this << " sending headers, size=" << size->compressed
<< ", uncompressedSize=" << size->uncompressed;
}
if (goawayBuf) {
VLOG(4) << *this << " moved GOAWAY to end of writeBuf";
writeBuf_.append(std::move(goawayBuf));
}
if (includeEOM) {
CHECK_GE(newOffset, oldOffset);
commonEom(txn, newOffset - oldOffset, true);
}
scheduleWrite();
onHeadersSent(headers, wasReusable);
}