in quic/client/QuicClientTransport.cpp [1196:1319]
void QuicClientTransport::recvMsg(
folly::AsyncUDPSocket& sock,
uint64_t readBufferSize,
int numPackets,
NetworkData& networkData,
folly::Optional<folly::SocketAddress>& server,
size_t& totalData) {
for (int packetNum = 0; packetNum < numPackets; ++packetNum) {
// We create 1 buffer per packet so that it is not shared, this enables
// us to decrypt in place. If the fizz decrypt api could decrypt in-place
// even if shared, then we could allocate one giant IOBuf here.
Buf readBuffer = folly::IOBuf::create(readBufferSize);
struct iovec vec {};
vec.iov_base = readBuffer->writableData();
vec.iov_len = readBufferSize;
sockaddr* rawAddr{nullptr};
struct sockaddr_storage addrStorage {};
socklen_t addrLen{sizeof(addrStorage)};
if (!server) {
rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
rawAddr->sa_family = sock.address().getFamily();
}
int flags = 0;
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams params;
struct msghdr msg {};
msg.msg_name = rawAddr;
msg.msg_namelen = size_t(addrLen);
msg.msg_iov = &vec;
msg.msg_iovlen = 1;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
bool useGRO = sock.getGRO() > 0;
bool useTS = sock.getTimestamping() > 0;
char control[folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams::
kCmsgSpace] = {};
if (useGRO || useTS) {
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
// we need to consider MSG_TRUNC too
flags |= MSG_TRUNC;
}
#endif
ssize_t ret = sock.recvmsg(&msg, flags);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// If we got a retriable error, let us continue.
if (conn_->loopDetectorCallback) {
conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR;
}
break;
}
// If we got a non-retriable error, we might have received
// a packet that we could process, however let's just quit early.
sock.pauseRead();
if (conn_->loopDetectorCallback) {
conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR;
}
return onReadError(folly::AsyncSocketException(
folly::AsyncSocketException::INTERNAL_ERROR,
"::recvmsg() failed",
errno));
} else if (ret == 0) {
break;
}
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
if (useGRO) {
folly::AsyncUDPSocket::fromMsg(params, msg);
// truncated
if ((size_t)ret > readBufferSize) {
ret = readBufferSize;
if (params.gro > 0) {
ret = ret - ret % params.gro;
}
}
}
#endif
size_t bytesRead = size_t(ret);
totalData += bytesRead;
if (!server) {
server = folly::SocketAddress();
server->setFromSockaddr(rawAddr, addrLen);
}
VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead;
readBuffer->append(bytesRead);
if (params.gro > 0) {
size_t len = bytesRead;
size_t remaining = len;
size_t offset = 0;
size_t totalNumPackets =
networkData.packets.size() + ((len + params.gro - 1) / params.gro);
networkData.packets.reserve(totalNumPackets);
while (remaining) {
if (static_cast<int>(remaining) > params.gro) {
auto tmp = readBuffer->cloneOne();
// start at offset
tmp->trimStart(offset);
// the actual len is len - offset now
// leave gro bytes
tmp->trimEnd(len - offset - params.gro);
DCHECK_EQ(tmp->length(), params.gro);
offset += params.gro;
remaining -= params.gro;
networkData.packets.emplace_back(std::move(tmp));
} else {
// do not clone the last packet
// start at offset, use all the remaining data
readBuffer->trimStart(offset);
DCHECK_EQ(readBuffer->length(), remaining);
remaining = 0;
networkData.packets.emplace_back(std::move(readBuffer));
}
}
} else {
networkData.packets.emplace_back(std::move(readBuffer));
}
trackDatagramReceived(bytesRead);
}
}