void QuicClientTransport::recvMsg()

in quic/client/QuicClientTransport.cpp [1196:1319]


void QuicClientTransport::recvMsg(
    folly::AsyncUDPSocket& sock,
    uint64_t readBufferSize,
    int numPackets,
    NetworkData& networkData,
    folly::Optional<folly::SocketAddress>& server,
    size_t& totalData) {
  for (int packetNum = 0; packetNum < numPackets; ++packetNum) {
    // We create 1 buffer per packet so that it is not shared, this enables
    // us to decrypt in place. If the fizz decrypt api could decrypt in-place
    // even if shared, then we could allocate one giant IOBuf here.
    Buf readBuffer = folly::IOBuf::create(readBufferSize);
    struct iovec vec {};
    vec.iov_base = readBuffer->writableData();
    vec.iov_len = readBufferSize;

    sockaddr* rawAddr{nullptr};
    struct sockaddr_storage addrStorage {};
    socklen_t addrLen{sizeof(addrStorage)};
    if (!server) {
      rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
      rawAddr->sa_family = sock.address().getFamily();
    }

    int flags = 0;
    folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams params;
    struct msghdr msg {};
    msg.msg_name = rawAddr;
    msg.msg_namelen = size_t(addrLen);
    msg.msg_iov = &vec;
    msg.msg_iovlen = 1;
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
    bool useGRO = sock.getGRO() > 0;
    bool useTS = sock.getTimestamping() > 0;
    char control[folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams::
                     kCmsgSpace] = {};

    if (useGRO || useTS) {
      msg.msg_control = control;
      msg.msg_controllen = sizeof(control);

      // we need to consider MSG_TRUNC too
      flags |= MSG_TRUNC;
    }
#endif

    ssize_t ret = sock.recvmsg(&msg, flags);
    if (ret < 0) {
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
        // If we got a retriable error, let us continue.
        if (conn_->loopDetectorCallback) {
          conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR;
        }
        break;
      }
      // If we got a non-retriable error, we might have received
      // a packet that we could process, however let's just quit early.
      sock.pauseRead();
      if (conn_->loopDetectorCallback) {
        conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR;
      }
      return onReadError(folly::AsyncSocketException(
          folly::AsyncSocketException::INTERNAL_ERROR,
          "::recvmsg() failed",
          errno));
    } else if (ret == 0) {
      break;
    }
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
    if (useGRO) {
      folly::AsyncUDPSocket::fromMsg(params, msg);

      // truncated
      if ((size_t)ret > readBufferSize) {
        ret = readBufferSize;
        if (params.gro > 0) {
          ret = ret - ret % params.gro;
        }
      }
    }
#endif
    size_t bytesRead = size_t(ret);
    totalData += bytesRead;
    if (!server) {
      server = folly::SocketAddress();
      server->setFromSockaddr(rawAddr, addrLen);
    }
    VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead;
    readBuffer->append(bytesRead);
    if (params.gro > 0) {
      size_t len = bytesRead;
      size_t remaining = len;
      size_t offset = 0;
      size_t totalNumPackets =
          networkData.packets.size() + ((len + params.gro - 1) / params.gro);
      networkData.packets.reserve(totalNumPackets);
      while (remaining) {
        if (static_cast<int>(remaining) > params.gro) {
          auto tmp = readBuffer->cloneOne();
          // start at offset
          tmp->trimStart(offset);
          // the actual len is len - offset now
          // leave gro bytes
          tmp->trimEnd(len - offset - params.gro);
          DCHECK_EQ(tmp->length(), params.gro);

          offset += params.gro;
          remaining -= params.gro;
          networkData.packets.emplace_back(std::move(tmp));
        } else {
          // do not clone the last packet
          // start at offset, use all the remaining data
          readBuffer->trimStart(offset);
          DCHECK_EQ(readBuffer->length(), remaining);
          remaining = 0;
          networkData.packets.emplace_back(std::move(readBuffer));
        }
      }
    } else {
      networkData.packets.emplace_back(std::move(readBuffer));
    }
    trackDatagramReceived(bytesRead);
  }
}