void HQSession::onDatagramsAvailable()

in proxygen/lib/http/session/HQSession.cpp [3696:3757]


void HQSession::onDatagramsAvailable() noexcept {
  auto result = sock_->readDatagramBufs();
  if (result.hasError()) {
    LOG(ERROR) << "Got error while reading datagrams: error="
               << toString(result.error());
    dropConnectionAsync(quic::QuicError(HTTP3::ErrorCode::HTTP_INTERNAL_ERROR,
                                        "H3_DATAGRAM: internal error "),
                        kErrorConnection);
    return;
  }
  VLOG(4) << "Received " << result.value().size()
          << " datagrams. sess=" << *this;
  for (auto& datagram : result.value()) {
    folly::io::Cursor cursor(datagram.get());
    auto quarterStreamId = quic::decodeQuicInteger(cursor);
    if (!quarterStreamId) {
      dropConnectionAsync(
          quic::QuicError(HTTP3::ErrorCode::HTTP_GENERAL_PROTOCOL_ERROR,
                          "H3_DATAGRAM: error decoding stream-id"),
          kErrorConnection);
    }
    auto ctxId = quic::decodeQuicInteger(cursor);
    if (!ctxId) {
      dropConnectionAsync(
          quic::QuicError(HTTP3::ErrorCode::HTTP_GENERAL_PROTOCOL_ERROR,
                          "H3_DATAGRAM: error decoding context-id"),
          kErrorConnection);
    }

    quic::BufQueue datagramQ;
    datagramQ.append(std::move(datagram));
    datagramQ.trimStart(quarterStreamId->second + ctxId->second);

    auto streamId = quarterStreamId->first * 4;
    auto stream = findNonDetachedStream(streamId);

    if (!stream || !stream->hasHeaders_) {
      VLOG(4) << "Stream cannot receive datagrams yet. streamId=" << streamId
              << " ctx=" << ctxId->first << " len=" << datagramQ.chainLength()
              << " sess=" << *this;
      // TODO: a possible optimization would be to discard datagrams destined
      // to streams that were already closed
      auto itr = datagramsBuffer_.find(streamId);
      if (itr == datagramsBuffer_.end()) {
        itr = datagramsBuffer_.insert(streamId, {}).first;
      }
      auto& vec = itr->second;
      if (vec.size() < vec.max_size()) {
        vec.emplace_back(datagramQ.move());
      } else {
        // buffer is full: discard the datagram
        datagramQ.move();
      }
      continue;
    }

    VLOG(4) << "Received datagram for streamId=" << streamId
            << " ctx=" << ctxId->first << " len=" << datagramQ.chainLength()
            << " sess=" << *this;
    stream->txn_.onDatagram(datagramQ.move());
  }
}