in proxygen/lib/http/session/HQSession.cpp [2630:2724]
void HQSession::HQStreamTransportBase::onHeadersComplete(
HTTPCodec::StreamID streamID, std::unique_ptr<HTTPMessage> msg) {
VLOG(4) << __func__ << " txn=" << txn_;
msg->dumpMessage(3);
// TODO: the codec will set this for non-H1Q
msg->setAdvancedProtocolString(session_.alpn_);
msg->setSecure(true);
CHECK(codecStreamId_);
CHECK_EQ(streamID, *codecStreamId_);
if (msg->isRequest() && session_.userAgent_.empty()) {
session_.userAgent_ = session_.codec_->getUserAgent();
}
hasHeaders_ = true;
// setupOnHeadersComplete is only implemented
// in the HQDownstreamSession, which does not
// receive push promises. Will only be called once
session_.setupOnHeadersComplete(&txn_, msg.get());
if (!txn_.getHandler()) {
txn_.sendAbort();
return;
}
// for h1q-fb-v1 start draining on receipt of a Connection:: close header
// if we are getting a response, transportReady has been called!
DCHECK(session_.versionUtils_);
session_.versionUtils_->headersComplete(msg.get());
// onHeadersComplete can be triggered by data from a different stream ID
// - specifically, the QPACK encoder stream. If that's true, then there may
// be unparsed data in HQStreamTransport. Add this stream's id to the
// read set and schedule a loop callback to restart it.
if (session_.pendingProcessReadSet_.find(getStreamId()) ==
session_.pendingProcessReadSet_.end() &&
!readBuf_.empty()) {
session_.pendingProcessReadSet_.insert(getStreamId());
session_.scheduleLoopCallback();
}
auto timeDiff = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - createdTime);
auto sock = session_.sock_;
auto streamId = getStreamId();
if (sock && sock->getState() && sock->getState()->qLogger) {
sock->getState()->qLogger->addStreamStateUpdate(
streamId, quic::kOnHeaders, timeDiff);
}
// In case a priority update was received on the control stream before
// getting here that overrides the initial priority received in the headers
if (sock) {
auto itr = session_.priorityUpdatesBuffer_.find(streamId);
if (itr != session_.priorityUpdatesBuffer_.end()) {
sock->setStreamPriority(
streamId, itr->second.urgency, itr->second.incremental);
} else {
const auto httpPriority = httpPriorityFromHTTPMessage(*msg);
if (httpPriority) {
sock->setStreamPriority(
streamId, httpPriority->urgency, httpPriority->incremental);
}
}
}
// Tell the HTTPTransaction to start processing the message now
// that the full ingress headers have arrived.
// Depending on the push promise latch, the message is delivered to
// the current transaction (no push promise) or to a freshly created
// pushed transaction. The latter is done via "onPushPromiseHeadersComplete"
// callback
if (ingressPushId_) {
onPushPromiseHeadersComplete(*ingressPushId_, streamID, std::move(msg));
ingressPushId_ = folly::none;
} else {
txn_.onIngressHeadersComplete(std::move(msg));
}
if (auto httpSessionActivityTracker =
session_.getHTTPSessionActivityTracker()) {
httpSessionActivityTracker->reportActivity();
}
// The stream can now receive datagrams: check for any pending datagram and
// deliver it to the handler
if (session_.datagramEnabled_ && !session_.datagramsBuffer_.empty()) {
auto itr = session_.datagramsBuffer_.find(streamId);
if (itr != session_.datagramsBuffer_.end()) {
auto& vec = itr->second;
for (auto& datagram : vec) {
txn_.onDatagram(std::move(datagram));
}
session_.datagramsBuffer_.erase(itr);
}
}
}