in proxygen/lib/http/session/HQSession.cpp [3696:3757]
void HQSession::onDatagramsAvailable() noexcept {
auto result = sock_->readDatagramBufs();
if (result.hasError()) {
LOG(ERROR) << "Got error while reading datagrams: error="
<< toString(result.error());
dropConnectionAsync(quic::QuicError(HTTP3::ErrorCode::HTTP_INTERNAL_ERROR,
"H3_DATAGRAM: internal error "),
kErrorConnection);
return;
}
VLOG(4) << "Received " << result.value().size()
<< " datagrams. sess=" << *this;
for (auto& datagram : result.value()) {
folly::io::Cursor cursor(datagram.get());
auto quarterStreamId = quic::decodeQuicInteger(cursor);
if (!quarterStreamId) {
dropConnectionAsync(
quic::QuicError(HTTP3::ErrorCode::HTTP_GENERAL_PROTOCOL_ERROR,
"H3_DATAGRAM: error decoding stream-id"),
kErrorConnection);
}
auto ctxId = quic::decodeQuicInteger(cursor);
if (!ctxId) {
dropConnectionAsync(
quic::QuicError(HTTP3::ErrorCode::HTTP_GENERAL_PROTOCOL_ERROR,
"H3_DATAGRAM: error decoding context-id"),
kErrorConnection);
}
quic::BufQueue datagramQ;
datagramQ.append(std::move(datagram));
datagramQ.trimStart(quarterStreamId->second + ctxId->second);
auto streamId = quarterStreamId->first * 4;
auto stream = findNonDetachedStream(streamId);
if (!stream || !stream->hasHeaders_) {
VLOG(4) << "Stream cannot receive datagrams yet. streamId=" << streamId
<< " ctx=" << ctxId->first << " len=" << datagramQ.chainLength()
<< " sess=" << *this;
// TODO: a possible optimization would be to discard datagrams destined
// to streams that were already closed
auto itr = datagramsBuffer_.find(streamId);
if (itr == datagramsBuffer_.end()) {
itr = datagramsBuffer_.insert(streamId, {}).first;
}
auto& vec = itr->second;
if (vec.size() < vec.max_size()) {
vec.emplace_back(datagramQ.move());
} else {
// buffer is full: discard the datagram
datagramQ.move();
}
continue;
}
VLOG(4) << "Received datagram for streamId=" << streamId
<< " ctx=" << ctxId->first << " len=" << datagramQ.chainLength()
<< " sess=" << *this;
stream->txn_.onDatagram(datagramQ.move());
}
}