in quic/client/QuicClientTransport.cpp [1321:1475]
void QuicClientTransport::recvMmsg(
folly::AsyncUDPSocket& sock,
uint64_t readBufferSize,
int numPackets,
NetworkData& networkData,
folly::Optional<folly::SocketAddress>& server,
size_t& totalData) {
const size_t addrLen = sizeof(struct sockaddr_storage);
auto& msgs = recvmmsgStorage_.msgs;
auto& addrs = recvmmsgStorage_.addrs;
auto& readBuffers = recvmmsgStorage_.readBuffers;
auto& iovecs = recvmmsgStorage_.iovecs;
auto& freeBufs = recvmmsgStorage_.freeBufs;
int flags = 0;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
bool useGRO = sock.getGRO() > 0;
bool useTS = sock.getTimestamping() > 0;
std::vector<std::array<
char,
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams::kCmsgSpace>>
controlVec(useGRO ? numPackets : 0);
// we need to consider MSG_TRUNC too
if (useGRO) {
flags |= MSG_TRUNC;
}
#endif
for (int i = 0; i < numPackets; ++i) {
Buf readBuffer;
if (freeBufs.empty()) {
readBuffer = folly::IOBuf::create(readBufferSize);
} else {
readBuffer = std::move(freeBufs.back());
DCHECK(readBuffer != nullptr);
freeBufs.pop_back();
}
iovecs[i].iov_base = readBuffer->writableData();
iovecs[i].iov_len = readBufferSize;
readBuffers[i] = std::move(readBuffer);
auto* rawAddr = reinterpret_cast<sockaddr*>(&addrs[i]);
rawAddr->sa_family = socket_->address().getFamily();
struct msghdr* msg = &msgs[i].msg_hdr;
msg->msg_name = rawAddr;
msg->msg_namelen = addrLen;
msg->msg_iov = &iovecs[i];
msg->msg_iovlen = 1;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
if (useGRO || useTS) {
::memset(controlVec[i].data(), 0, controlVec[i].size());
msg->msg_control = controlVec[i].data();
msg->msg_controllen = controlVec[i].size();
}
#endif
}
int numMsgsRecvd = sock.recvmmsg(msgs.data(), numPackets, flags, nullptr);
if (numMsgsRecvd < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Exit, socket will notify us again when socket is readable.
if (conn_->loopDetectorCallback) {
conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR;
}
return;
}
// If we got a non-retriable error, we might have received
// a packet that we could process, however let's just quit early.
sock.pauseRead();
if (conn_->loopDetectorCallback) {
conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR;
}
return onReadError(folly::AsyncSocketException(
folly::AsyncSocketException::INTERNAL_ERROR,
"::recvmmsg() failed",
errno));
}
CHECK_LE(numMsgsRecvd, numPackets);
// Need to save our position so we can recycle the unused buffers.
int i;
for (i = 0; i < numMsgsRecvd; ++i) {
size_t bytesRead = msgs[i].msg_len;
if (bytesRead == 0) {
// Empty datagram, this is probably garbage matching our tuple, we should
// ignore such datagrams.
freeBufs.emplace_back(std::move(readBuffers[i]));
continue;
}
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams params;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
if (useGRO || useTS) {
folly::AsyncUDPSocket::fromMsg(params, msgs[i].msg_hdr);
// truncated
if (bytesRead > readBufferSize) {
bytesRead = readBufferSize;
if (params.gro > 0) {
bytesRead = bytesRead - bytesRead % params.gro;
}
}
}
#endif
totalData += bytesRead;
if (!server) {
server = folly::SocketAddress();
auto* rawAddr = reinterpret_cast<sockaddr*>(&addrs[i]);
server->setFromSockaddr(rawAddr, addrLen);
}
VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead;
readBuffers[i]->append(bytesRead);
if (params.gro > 0) {
size_t len = bytesRead;
size_t remaining = len;
size_t offset = 0;
size_t totalNumPackets =
networkData.packets.size() + ((len + params.gro - 1) / params.gro);
networkData.packets.reserve(totalNumPackets);
while (remaining) {
if (static_cast<int>(remaining) > params.gro) {
auto tmp = readBuffers[i]->cloneOne();
// start at offset
tmp->trimStart(offset);
// the actual len is len - offset now
// leave gro bytes
tmp->trimEnd(len - offset - params.gro);
DCHECK_EQ(tmp->length(), params.gro);
offset += params.gro;
remaining -= params.gro;
networkData.packets.emplace_back(std::move(tmp));
} else {
// do not clone the last packet
// start at offset, use all the remaining data
readBuffers[i]->trimStart(offset);
DCHECK_EQ(readBuffers[i]->length(), remaining);
remaining = 0;
networkData.packets.emplace_back(std::move(readBuffers[i]));
}
}
} else {
networkData.packets.emplace_back(std::move(readBuffers[i]));
}
trackDatagramReceived(bytesRead);
}
for (; i < numPackets; i++) {
freeBufs.emplace_back(std::move(readBuffers[i]));
DCHECK(freeBufs.back() != nullptr);
}
}