size_t AsyncSocket::handleErrMessages()

in folly/io/async/AsyncSocket.cpp [2526:2639]


size_t AsyncSocket::handleErrMessages() noexcept {
  // This method has non-empty implementation only for platforms
  // supporting per-socket error queues.
  VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
          << ", state=" << state_;
  if (errMessageCallback_ == nullptr && idZeroCopyBufPtrMap_.empty() &&
      (!byteEventHelper_ || !byteEventHelper_->byteEventsEnabled)) {
    VLOG(7) << "AsyncSocket::handleErrMessages(): "
            << "no err message callback installed and "
            << "ByteEvents not enabled - exiting.";
    return 0;
  }

#ifdef FOLLY_HAVE_MSG_ERRQUEUE
  uint8_t ctrl[1024];
  unsigned char data;
  struct msghdr msg;
  iovec entry;

  entry.iov_base = &data;
  entry.iov_len = sizeof(data);
  msg.msg_iov = &entry;
  msg.msg_iovlen = 1;
  msg.msg_name = nullptr;
  msg.msg_namelen = 0;
  msg.msg_control = ctrl;
  msg.msg_controllen = sizeof(ctrl);
  msg.msg_flags = 0;

  int ret;
  size_t num = 0;
  // the socket may be closed by errMessage callback, so check on each iteration
  while (fd_ != NetworkSocket()) {
    ret = netops_->recvmsg(fd_, &msg, MSG_ERRQUEUE);
    VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;

    if (ret < 0) {
      if (errno != EAGAIN) {
        auto errnoCopy = errno;
        LOG(ERROR) << "::recvmsg exited with code " << ret
                   << ", errno: " << errnoCopy << ", fd: " << fd_;
        AsyncSocketException ex(
            AsyncSocketException::INTERNAL_ERROR,
            withAddr("recvmsg() failed"),
            errnoCopy);
        failErrMessageRead(__func__, ex);
      }

      return num;
    }

    for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
         cmsg != nullptr && cmsg->cmsg_len != 0;
         cmsg = CMSG_NXTHDR(&msg, cmsg)) {
      ++num;
      if (isZeroCopyMsg(*cmsg)) {
        processZeroCopyMsg(*cmsg);
        continue;
      }

      // try to process it as a ByteEvent and forward to observers
      //
      // observers cannot throw and thus we expect only exceptions from
      // ByteEventHelper, but we guard against other cases for safety
      if (byteEventHelper_) {
        try {
          if (const auto maybeByteEvent =
                  byteEventHelper_->processCmsg(*cmsg, getRawBytesWritten())) {
            const auto& byteEvent = maybeByteEvent.value();
            for (const auto& observer : lifecycleObservers_) {
              if (observer->getConfig().byteEvents) {
                observer->byteEvent(this, byteEvent);
              }
            }
          }
        } catch (const ByteEventHelper::Exception& behEx) {
          // rewrap the ByteEventHelper::Exception with extra information
          AsyncSocketException ex(
              AsyncSocketException::INTERNAL_ERROR,
              withAddr(
                  string("AsyncSocket::handleErrMessages(), "
                         "internal exception during ByteEvent processing: ") +
                  behEx.what()));
          failByteEvents(ex);
        } catch (const std::exception& ex) {
          AsyncSocketException tex(
              AsyncSocketException::UNKNOWN,
              string("AsyncSocket::handleErrMessages(), "
                     "unhandled exception during ByteEvent processing, "
                     "threw exception: ") +
                  ex.what());
          failByteEvents(tex);
        } catch (...) {
          AsyncSocketException tex(
              AsyncSocketException::UNKNOWN,
              string("AsyncSocket::handleErrMessages(), "
                     "unhandled exception during ByteEvent processing, "
                     "threw non-exception type"));
          failByteEvents(tex);
        }
      }

      // even if it is a timestamp, hand it off to the errMessageCallback,
      // the application may want it as well.
      if (errMessageCallback_) {
        errMessageCallback_->errMessage(*cmsg);
      }
    }
  }
  return num;
#else
  return 0;
#endif // FOLLY_HAVE_MSG_ERRQUEUE
}