void HQSession::HQStreamTransportBase::onHeadersComplete()

in proxygen/lib/http/session/HQSession.cpp [2630:2724]


void HQSession::HQStreamTransportBase::onHeadersComplete(
    HTTPCodec::StreamID streamID, std::unique_ptr<HTTPMessage> msg) {
  VLOG(4) << __func__ << " txn=" << txn_;
  msg->dumpMessage(3);
  // TODO: the codec will set this for non-H1Q
  msg->setAdvancedProtocolString(session_.alpn_);
  msg->setSecure(true);
  CHECK(codecStreamId_);
  CHECK_EQ(streamID, *codecStreamId_);

  if (msg->isRequest() && session_.userAgent_.empty()) {
    session_.userAgent_ = session_.codec_->getUserAgent();
  }

  hasHeaders_ = true;
  //  setupOnHeadersComplete is only implemented
  //  in the HQDownstreamSession, which does not
  //  receive push promises. Will only be called once
  session_.setupOnHeadersComplete(&txn_, msg.get());
  if (!txn_.getHandler()) {
    txn_.sendAbort();
    return;
  }

  // for h1q-fb-v1 start draining on receipt of a Connection:: close header
  // if we are getting a response, transportReady has been called!
  DCHECK(session_.versionUtils_);
  session_.versionUtils_->headersComplete(msg.get());

  // onHeadersComplete can be triggered by data from a different stream ID
  // - specifically, the QPACK encoder stream.  If that's true, then there may
  // be unparsed data in HQStreamTransport.  Add this stream's id to the
  // read set and schedule a loop callback to restart it.
  if (session_.pendingProcessReadSet_.find(getStreamId()) ==
          session_.pendingProcessReadSet_.end() &&
      !readBuf_.empty()) {
    session_.pendingProcessReadSet_.insert(getStreamId());
    session_.scheduleLoopCallback();
  }

  auto timeDiff = std::chrono::duration_cast<std::chrono::milliseconds>(
      std::chrono::steady_clock::now() - createdTime);
  auto sock = session_.sock_;
  auto streamId = getStreamId();
  if (sock && sock->getState() && sock->getState()->qLogger) {
    sock->getState()->qLogger->addStreamStateUpdate(
        streamId, quic::kOnHeaders, timeDiff);
  }

  // In case a priority update was received on the control stream before
  // getting here that overrides the initial priority received in the headers
  if (sock) {
    auto itr = session_.priorityUpdatesBuffer_.find(streamId);
    if (itr != session_.priorityUpdatesBuffer_.end()) {
      sock->setStreamPriority(
          streamId, itr->second.urgency, itr->second.incremental);
    } else {
      const auto httpPriority = httpPriorityFromHTTPMessage(*msg);
      if (httpPriority) {
        sock->setStreamPriority(
            streamId, httpPriority->urgency, httpPriority->incremental);
      }
    }
  }

  // Tell the HTTPTransaction to start processing the message now
  // that the full ingress headers have arrived.
  // Depending on the push promise latch, the message is delivered to
  // the current transaction (no push promise) or to a freshly created
  // pushed transaction. The latter is done via "onPushPromiseHeadersComplete"
  // callback
  if (ingressPushId_) {
    onPushPromiseHeadersComplete(*ingressPushId_, streamID, std::move(msg));
    ingressPushId_ = folly::none;
  } else {
    txn_.onIngressHeadersComplete(std::move(msg));
  }
  if (auto httpSessionActivityTracker =
          session_.getHTTPSessionActivityTracker()) {
    httpSessionActivityTracker->reportActivity();
  }

  // The stream can now receive datagrams: check for any pending datagram and
  // deliver it to the handler
  if (session_.datagramEnabled_ && !session_.datagramsBuffer_.empty()) {
    auto itr = session_.datagramsBuffer_.find(streamId);
    if (itr != session_.datagramsBuffer_.end()) {
      auto& vec = itr->second;
      for (auto& datagram : vec) {
        txn_.onDatagram(std::move(datagram));
      }
      session_.datagramsBuffer_.erase(itr);
    }
  }
}