void TSocket::openConnection()

in thrift/lib/cpp/transport/TSocket.cpp [138:313]


void TSocket::openConnection(struct addrinfo* res) {
  apache::thrift::util::PausableTimer pausableTimer(options_.connTimeout);
  int errno_after_poll;

  if (isOpen()) {
    throw TTransportException(TTransportException::ALREADY_OPEN);
  }

  if (!path_.empty()) {
    socket_ = fsp::socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
  } else {
    socket_ = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
  }
  if (socket_ == -1) {
    int errno_copy = errno;
    GlobalOutput.perror(
        "TSocket::open() socket() " + getSocketInfo(), errno_copy);
    throw TTransportException(
        TTransportException::NOT_OPEN, "socket()", errno_copy);
  }

  setSocketOptions(options_);

  // Uses a low min RTO if asked to.
#ifdef TCP_LOW_MIN_RTO
  if (getUseLowMinRto()) {
    int one = 1;
    setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
  }
#endif

  // Set the socket to be non blocking for connect if a timeout exists
  int flags = fcntl(socket_, F_GETFL, 0);
  if (options_.connTimeout > 0) {
    if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {
      int errno_copy = errno;
      GlobalOutput.perror(
          "TSocket::open() fcntl() " + getSocketInfo(), errno_copy);
      throw TTransportException(
          TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
    }
  } else {
    if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {
      int errno_copy = errno;
      GlobalOutput.perror(
          "TSocket::open() fcntl " + getSocketInfo(), errno_copy);
      throw TTransportException(
          TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
    }
  }

  if (options_.reuseAddr) {
    int val = 1;
    if (-1 ==
        setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
      int errno_copy = errno; // Copy errno because we're allocating memory.
      GlobalOutput.perror(
          "TSocket::open() setsockopt(SO_REUSEADDR) " + getSocketInfo(),
          errno_copy);
      // No need to throw.
    }
  }

  // Connect the socket
  int ret;
  if (!path_.empty()) {
    size_t len = path_.size() + 1;
    if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) {
      int errno_copy = errno;
      GlobalOutput.perror(
          "TSocket::open() Unix Domain socket path too long", errno_copy);
      throw TTransportException(
          TTransportException::NOT_OPEN, " Unix Domain socket path too long");
    }

    struct sockaddr_un address;
    address.sun_family = AF_UNIX;
    memcpy(address.sun_path, path_.c_str(), len);
    socklen_t structlen = static_cast<socklen_t>(sizeof(address));
    ret = connect(socket_, (struct sockaddr*)&address, structlen);
  } else {
    ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
  }

  // success case
  if (ret == 0) {
    goto done;
  }

  if (errno != EINPROGRESS) {
    int errno_copy = errno;
    GlobalOutput.perror(
        "TSocket::open() connect() " + getSocketInfo(), errno_copy);
    throw TTransportException(
        TTransportException::NOT_OPEN,
        "connect() failed " + getSocketInfo(),
        errno_copy);
  }

try_again:
  struct pollfd fds[1];
  std::memset(fds, 0, sizeof(fds));
  fds[0].fd = socket_;
  fds[0].events = POLLOUT;

  // When there is a poll timeout set, an EINTR will restart the
  // poll() and hence reset the timer. So if we receive EINTRs at a
  // faster rate than the timeout value, the timeout will never
  // trigger. Therefore, we keep track of the total amount of time
  // we've spend in poll(), and if this value exceeds the timeout then
  // we stop retrying on EINTR. Note that we might still exceed the
  // timeout, but by at most a factor of 2.
  pausableTimer.start();
  ret = poll(fds, 1, options_.connTimeout);
  errno_after_poll =
      errno; // gettimeofday, used by PausableTimer, can change errno
  pausableTimer.stop();

  if (ret > 0) {
    // Ensure the socket is connected and that there are no errors set
    int val;
    socklen_t lon;
    lon = sizeof(int);
    int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void*)&val, &lon);
    if (ret2 == -1) {
      GlobalOutput.perror(
          "TSocket::open() getsockopt() " + getSocketInfo(), errno_after_poll);
      throw TTransportException(
          TTransportException::NOT_OPEN, "getsockopt()", errno_after_poll);
    }
    // no errors on socket, go to town
    if (val == 0) {
      goto done;
    }
    GlobalOutput.perror(
        "TSocket::open() error on socket (after poll) " + getSocketInfo(), val);
    throw TTransportException(
        TTransportException::NOT_OPEN, "socket open() error", val);
  } else if (ret == 0) {
    // socket timed out
    string errStr = "TSocket::open() timed out " + getSocketInfo();
    GlobalOutput(errStr.c_str());
    throw TTransportException(
        TTransportException::NOT_OPEN, "open() timed out " + getSocketInfo());
  } else {
    // If interrupted, try again, but only if we haven't exceeded the
    // timeout value yet.
    if (errno_after_poll == EINTR) {
      if (pausableTimer.hasExceededTimeLimit()) {
        GlobalOutput.perror(
            "TSocket::open() poll() (EINTRs, then timed out) " +
                getSocketInfo(),
            errno_after_poll);
        throw TTransportException(
            TTransportException::NOT_OPEN,
            "poll() failed (EINTRs, then timed out)",
            errno_after_poll);
      } else {
        goto try_again;
      }
    } else { // error on poll() other than EINTR
      GlobalOutput.perror(
          "TSocket::open() poll() " + getSocketInfo(), errno_after_poll);
      throw TTransportException(
          TTransportException::NOT_OPEN, "poll() failed", errno_after_poll);
    }
  }

done:
  // Set socket back to normal mode (blocking)
  fcntl(socket_, F_SETFL, flags);

  if (path_.empty()) {
    setCachedAddress(res->ai_addr, res->ai_addrlen);
  }
}