in folly/io/async/AsyncSocket.cpp [2526:2639]
size_t AsyncSocket::handleErrMessages() noexcept {
// This method has non-empty implementation only for platforms
// supporting per-socket error queues.
VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
<< ", state=" << state_;
if (errMessageCallback_ == nullptr && idZeroCopyBufPtrMap_.empty() &&
(!byteEventHelper_ || !byteEventHelper_->byteEventsEnabled)) {
VLOG(7) << "AsyncSocket::handleErrMessages(): "
<< "no err message callback installed and "
<< "ByteEvents not enabled - exiting.";
return 0;
}
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
uint8_t ctrl[1024];
unsigned char data;
struct msghdr msg;
iovec entry;
entry.iov_base = &data;
entry.iov_len = sizeof(data);
msg.msg_iov = &entry;
msg.msg_iovlen = 1;
msg.msg_name = nullptr;
msg.msg_namelen = 0;
msg.msg_control = ctrl;
msg.msg_controllen = sizeof(ctrl);
msg.msg_flags = 0;
int ret;
size_t num = 0;
// the socket may be closed by errMessage callback, so check on each iteration
while (fd_ != NetworkSocket()) {
ret = netops_->recvmsg(fd_, &msg, MSG_ERRQUEUE);
VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
if (ret < 0) {
if (errno != EAGAIN) {
auto errnoCopy = errno;
LOG(ERROR) << "::recvmsg exited with code " << ret
<< ", errno: " << errnoCopy << ", fd: " << fd_;
AsyncSocketException ex(
AsyncSocketException::INTERNAL_ERROR,
withAddr("recvmsg() failed"),
errnoCopy);
failErrMessageRead(__func__, ex);
}
return num;
}
for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
cmsg != nullptr && cmsg->cmsg_len != 0;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
++num;
if (isZeroCopyMsg(*cmsg)) {
processZeroCopyMsg(*cmsg);
continue;
}
// try to process it as a ByteEvent and forward to observers
//
// observers cannot throw and thus we expect only exceptions from
// ByteEventHelper, but we guard against other cases for safety
if (byteEventHelper_) {
try {
if (const auto maybeByteEvent =
byteEventHelper_->processCmsg(*cmsg, getRawBytesWritten())) {
const auto& byteEvent = maybeByteEvent.value();
for (const auto& observer : lifecycleObservers_) {
if (observer->getConfig().byteEvents) {
observer->byteEvent(this, byteEvent);
}
}
}
} catch (const ByteEventHelper::Exception& behEx) {
// rewrap the ByteEventHelper::Exception with extra information
AsyncSocketException ex(
AsyncSocketException::INTERNAL_ERROR,
withAddr(
string("AsyncSocket::handleErrMessages(), "
"internal exception during ByteEvent processing: ") +
behEx.what()));
failByteEvents(ex);
} catch (const std::exception& ex) {
AsyncSocketException tex(
AsyncSocketException::UNKNOWN,
string("AsyncSocket::handleErrMessages(), "
"unhandled exception during ByteEvent processing, "
"threw exception: ") +
ex.what());
failByteEvents(tex);
} catch (...) {
AsyncSocketException tex(
AsyncSocketException::UNKNOWN,
string("AsyncSocket::handleErrMessages(), "
"unhandled exception during ByteEvent processing, "
"threw non-exception type"));
failByteEvents(tex);
}
}
// even if it is a timestamp, hand it off to the errMessageCallback,
// the application may want it as well.
if (errMessageCallback_) {
errMessageCallback_->errMessage(*cmsg);
}
}
}
return num;
#else
return 0;
#endif // FOLLY_HAVE_MSG_ERRQUEUE
}