ssize_t AsyncUDPSocket::writeChain()

in folly/io/async/AsyncUDPSocket.cpp [480:579]


ssize_t AsyncUDPSocket::writeChain(
    const folly::SocketAddress& address,
    std::unique_ptr<folly::IOBuf>&& buf,
    WriteOptions options) {
  int msg_flags = options.zerocopy ? getZeroCopyFlags() : 0;
  iovec vec[16];
  size_t iovec_len = buf->fillIov(vec, sizeof(vec) / sizeof(vec[0])).numIovecs;
  if (UNLIKELY(iovec_len == 0)) {
    buf->coalesce();
    vec[0].iov_base = const_cast<uint8_t*>(buf->data());
    vec[0].iov_len = buf->length();
    iovec_len = 1;
  }
  CHECK_NE(NetworkSocket(), fd_) << "Socket not yet bound";
  sockaddr_storage addrStorage;
  address.getAddress(&addrStorage);

  struct msghdr msg;
  if (!connected_) {
    msg.msg_name = reinterpret_cast<void*>(&addrStorage);
    msg.msg_namelen = address.getActualSize();
  } else {
    if (connectedAddress_ != address) {
      errno = ENOTSUP;
      return -1;
    }
    msg.msg_name = nullptr;
    msg.msg_namelen = 0;
  }
  msg.msg_iov = const_cast<struct iovec*>(vec);
  msg.msg_iovlen = iovec_len;
  msg.msg_control = nullptr;
  msg.msg_controllen = 0;
  msg.msg_flags = 0;

#ifdef FOLLY_HAVE_MSG_ERRQUEUE
  char control
      [CMSG_SPACE(sizeof(uint16_t)) + /*gso*/
       CMSG_SPACE(sizeof(uint64_t)) /*txtime*/
  ] = {};
  msg.msg_control = control;
  struct cmsghdr* cm = nullptr;
  if (options.gso > 0) {
    msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
    cm = CMSG_FIRSTHDR(&msg);

    cm->cmsg_level = SOL_UDP;
    cm->cmsg_type = UDP_SEGMENT;
    cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
    auto gso_len = static_cast<uint16_t>(options.gso);
    memcpy(CMSG_DATA(cm), &gso_len, sizeof(gso_len));
  }

  if (options.txTime.count() > 0 && txTime_.has_value() &&
      (txTime_.value().clockid >= 0)) {
    msg.msg_controllen += CMSG_SPACE(sizeof(uint64_t));
    if (cm) {
      cm = CMSG_NXTHDR(&msg, cm);
    } else {
      cm = CMSG_FIRSTHDR(&msg);
    }
    cm->cmsg_level = SOL_SOCKET;
    cm->cmsg_type = SCM_TXTIME;
    cm->cmsg_len = CMSG_LEN(sizeof(uint64_t));

    struct timespec ts;
    clock_gettime(txTime_.value().clockid, &ts);
    uint64_t txtime = ts.tv_sec * 1000000000ULL + ts.tv_nsec +
        std::chrono::nanoseconds(options.txTime).count();
    memcpy(CMSG_DATA(cm), &txtime, sizeof(txtime));
  }
#else
  CHECK_LT(options.gso, 1) << "GSO not supported";
  CHECK_LT(options.txTime.count(), 1) << "TX_TIME not supported";
#endif

  auto ret = sendmsg(fd_, &msg, msg_flags);
  if (msg_flags) {
    if (ret < 0) {
      if (errno == ENOBUFS) {
        LOG(INFO) << "ENOBUFS...";
        // workaround for running with zerocopy enabled but without a big enough
        // memlock value - see ulimit -l
        // Also see /proc/sys/net/core/optmem_max
        zeroCopyEnabled_ = false;
        zeroCopyReenableCounter_ = zeroCopyReenableThreshold_;

        ret = sendmsg(fd_, &msg, 0);
      }
    } else {
      addZeroCopyBuf(std::move(buf));
    }
  }

  if (ioBufFreeFunc_ && buf) {
    ioBufFreeFunc_(std::move(buf));
  }

  return ret;
} // namespace folly