void AsyncUDPSocket::handleRead()

in folly/io/async/AsyncUDPSocket.cpp [1071:1180]


void AsyncUDPSocket::handleRead() noexcept {
  void* buf{nullptr};
  size_t len{0};

  if (handleErrMessages()) {
    return;
  }

  if (fd_ == NetworkSocket()) {
    // The socket may have been closed by the error callbacks.
    return;
  }
  if (readCallback_->shouldOnlyNotify()) {
    return readCallback_->onNotifyDataAvailable(*this);
  }

  size_t numReads = maxReadsPerEvent_ ? maxReadsPerEvent_ : size_t(-1);
  EventBase* originalEventBase = eventBase_;
  while (numReads-- && readCallback_ && eventBase_ == originalEventBase) {
    readCallback_->getReadBuffer(&buf, &len);
    if (buf == nullptr || len == 0) {
      AsyncSocketException ex(
          AsyncSocketException::BAD_ARGS,
          "AsyncUDPSocket::getReadBuffer() returned empty buffer");

      auto cob = readCallback_;
      readCallback_ = nullptr;

      cob->onReadError(ex);
      updateRegistration();
      return;
    }

    struct sockaddr_storage addrStorage;
    socklen_t addrLen = sizeof(addrStorage);
    memset(&addrStorage, 0, size_t(addrLen));
    auto rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
    rawAddr->sa_family = localAddress_.getFamily();

    ssize_t bytesRead;
    ReadCallback::OnDataAvailableParams params;

#ifdef FOLLY_HAVE_MSG_ERRQUEUE
    bool use_gro = gro_.has_value() && (gro_.value() > 0);
    bool use_ts = ts_.has_value() && (ts_.value() > 0);
    if (use_gro || use_ts) {
      char control[ReadCallback::OnDataAvailableParams::kCmsgSpace] = {};

      struct msghdr msg = {};
      struct iovec iov = {};

      iov.iov_base = buf;
      iov.iov_len = len;

      msg.msg_iov = &iov;
      msg.msg_iovlen = 1;

      msg.msg_name = rawAddr;
      msg.msg_namelen = addrLen;

      msg.msg_control = control;
      msg.msg_controllen = sizeof(control);

      bytesRead = netops::recvmsg(fd_, &msg, MSG_TRUNC);

      if (bytesRead >= 0) {
        addrLen = msg.msg_namelen;
        fromMsg(params, msg);
      }
    } else {
      bytesRead = netops::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
    }
#else
    bytesRead = netops::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
#endif
    if (bytesRead >= 0) {
      clientAddress_.setFromSockaddr(rawAddr, addrLen);

      if (bytesRead > 0) {
        bool truncated = false;
        if ((size_t)bytesRead > len) {
          truncated = true;
          bytesRead = ssize_t(len);
        }

        readCallback_->onDataAvailable(
            clientAddress_, size_t(bytesRead), truncated, params);
      }
    } else {
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
        // No data could be read without blocking the socket
        return;
      }

      AsyncSocketException ex(
          AsyncSocketException::INTERNAL_ERROR, "::recvfrom() failed", errno);

      // In case of UDP we can continue reading from the socket
      // even if the current request fails. We notify the user
      // so that he can do some logging/stats collection if he wants.
      auto cob = readCallback_;
      readCallback_ = nullptr;

      cob->onReadError(ex);
      updateRegistration();

      return;
    }
  }
}