void QuicClientTransport::recvMmsg()

in quic/client/QuicClientTransport.cpp [1321:1475]


void QuicClientTransport::recvMmsg(
    folly::AsyncUDPSocket& sock,
    uint64_t readBufferSize,
    int numPackets,
    NetworkData& networkData,
    folly::Optional<folly::SocketAddress>& server,
    size_t& totalData) {
  const size_t addrLen = sizeof(struct sockaddr_storage);

  auto& msgs = recvmmsgStorage_.msgs;
  auto& addrs = recvmmsgStorage_.addrs;
  auto& readBuffers = recvmmsgStorage_.readBuffers;
  auto& iovecs = recvmmsgStorage_.iovecs;
  auto& freeBufs = recvmmsgStorage_.freeBufs;
  int flags = 0;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
  bool useGRO = sock.getGRO() > 0;
  bool useTS = sock.getTimestamping() > 0;
  std::vector<std::array<
      char,
      folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams::kCmsgSpace>>
      controlVec(useGRO ? numPackets : 0);

  // we need to consider MSG_TRUNC too
  if (useGRO) {
    flags |= MSG_TRUNC;
  }
#endif

  for (int i = 0; i < numPackets; ++i) {
    Buf readBuffer;
    if (freeBufs.empty()) {
      readBuffer = folly::IOBuf::create(readBufferSize);
    } else {
      readBuffer = std::move(freeBufs.back());
      DCHECK(readBuffer != nullptr);
      freeBufs.pop_back();
    }
    iovecs[i].iov_base = readBuffer->writableData();
    iovecs[i].iov_len = readBufferSize;
    readBuffers[i] = std::move(readBuffer);

    auto* rawAddr = reinterpret_cast<sockaddr*>(&addrs[i]);
    rawAddr->sa_family = socket_->address().getFamily();

    struct msghdr* msg = &msgs[i].msg_hdr;
    msg->msg_name = rawAddr;
    msg->msg_namelen = addrLen;
    msg->msg_iov = &iovecs[i];
    msg->msg_iovlen = 1;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
    if (useGRO || useTS) {
      ::memset(controlVec[i].data(), 0, controlVec[i].size());
      msg->msg_control = controlVec[i].data();
      msg->msg_controllen = controlVec[i].size();
    }
#endif
  }

  int numMsgsRecvd = sock.recvmmsg(msgs.data(), numPackets, flags, nullptr);
  if (numMsgsRecvd < 0) {
    if (errno == EAGAIN || errno == EWOULDBLOCK) {
      // Exit, socket will notify us again when socket is readable.
      if (conn_->loopDetectorCallback) {
        conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR;
      }
      return;
    }
    // If we got a non-retriable error, we might have received
    // a packet that we could process, however let's just quit early.
    sock.pauseRead();
    if (conn_->loopDetectorCallback) {
      conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR;
    }
    return onReadError(folly::AsyncSocketException(
        folly::AsyncSocketException::INTERNAL_ERROR,
        "::recvmmsg() failed",
        errno));
  }

  CHECK_LE(numMsgsRecvd, numPackets);
  // Need to save our position so we can recycle the unused buffers.
  int i;
  for (i = 0; i < numMsgsRecvd; ++i) {
    size_t bytesRead = msgs[i].msg_len;
    if (bytesRead == 0) {
      // Empty datagram, this is probably garbage matching our tuple, we should
      // ignore such datagrams.
      freeBufs.emplace_back(std::move(readBuffers[i]));
      continue;
    }
    folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams params;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
    if (useGRO || useTS) {
      folly::AsyncUDPSocket::fromMsg(params, msgs[i].msg_hdr);

      // truncated
      if (bytesRead > readBufferSize) {
        bytesRead = readBufferSize;
        if (params.gro > 0) {
          bytesRead = bytesRead - bytesRead % params.gro;
        }
      }
    }
#endif
    totalData += bytesRead;

    if (!server) {
      server = folly::SocketAddress();
      auto* rawAddr = reinterpret_cast<sockaddr*>(&addrs[i]);
      server->setFromSockaddr(rawAddr, addrLen);
    }

    VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead;
    readBuffers[i]->append(bytesRead);
    if (params.gro > 0) {
      size_t len = bytesRead;
      size_t remaining = len;
      size_t offset = 0;
      size_t totalNumPackets =
          networkData.packets.size() + ((len + params.gro - 1) / params.gro);
      networkData.packets.reserve(totalNumPackets);
      while (remaining) {
        if (static_cast<int>(remaining) > params.gro) {
          auto tmp = readBuffers[i]->cloneOne();
          // start at offset
          tmp->trimStart(offset);
          // the actual len is len - offset now
          // leave gro bytes
          tmp->trimEnd(len - offset - params.gro);
          DCHECK_EQ(tmp->length(), params.gro);

          offset += params.gro;
          remaining -= params.gro;
          networkData.packets.emplace_back(std::move(tmp));
        } else {
          // do not clone the last packet
          // start at offset, use all the remaining data
          readBuffers[i]->trimStart(offset);
          DCHECK_EQ(readBuffers[i]->length(), remaining);
          remaining = 0;
          networkData.packets.emplace_back(std::move(readBuffers[i]));
        }
      }
    } else {
      networkData.packets.emplace_back(std::move(readBuffers[i]));
    }

    trackDatagramReceived(bytesRead);
  }
  for (; i < numPackets; i++) {
    freeBufs.emplace_back(std::move(readBuffers[i]));
    DCHECK(freeBufs.back() != nullptr);
  }
}