in thrift/lib/cpp/transport/TSocket.cpp [138:313]
void TSocket::openConnection(struct addrinfo* res) {
apache::thrift::util::PausableTimer pausableTimer(options_.connTimeout);
int errno_after_poll;
if (isOpen()) {
throw TTransportException(TTransportException::ALREADY_OPEN);
}
if (!path_.empty()) {
socket_ = fsp::socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
} else {
socket_ = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
}
if (socket_ == -1) {
int errno_copy = errno;
GlobalOutput.perror(
"TSocket::open() socket() " + getSocketInfo(), errno_copy);
throw TTransportException(
TTransportException::NOT_OPEN, "socket()", errno_copy);
}
setSocketOptions(options_);
// Uses a low min RTO if asked to.
#ifdef TCP_LOW_MIN_RTO
if (getUseLowMinRto()) {
int one = 1;
setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
}
#endif
// Set the socket to be non blocking for connect if a timeout exists
int flags = fcntl(socket_, F_GETFL, 0);
if (options_.connTimeout > 0) {
if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {
int errno_copy = errno;
GlobalOutput.perror(
"TSocket::open() fcntl() " + getSocketInfo(), errno_copy);
throw TTransportException(
TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
}
} else {
if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {
int errno_copy = errno;
GlobalOutput.perror(
"TSocket::open() fcntl " + getSocketInfo(), errno_copy);
throw TTransportException(
TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
}
}
if (options_.reuseAddr) {
int val = 1;
if (-1 ==
setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
int errno_copy = errno; // Copy errno because we're allocating memory.
GlobalOutput.perror(
"TSocket::open() setsockopt(SO_REUSEADDR) " + getSocketInfo(),
errno_copy);
// No need to throw.
}
}
// Connect the socket
int ret;
if (!path_.empty()) {
size_t len = path_.size() + 1;
if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) {
int errno_copy = errno;
GlobalOutput.perror(
"TSocket::open() Unix Domain socket path too long", errno_copy);
throw TTransportException(
TTransportException::NOT_OPEN, " Unix Domain socket path too long");
}
struct sockaddr_un address;
address.sun_family = AF_UNIX;
memcpy(address.sun_path, path_.c_str(), len);
socklen_t structlen = static_cast<socklen_t>(sizeof(address));
ret = connect(socket_, (struct sockaddr*)&address, structlen);
} else {
ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
}
// success case
if (ret == 0) {
goto done;
}
if (errno != EINPROGRESS) {
int errno_copy = errno;
GlobalOutput.perror(
"TSocket::open() connect() " + getSocketInfo(), errno_copy);
throw TTransportException(
TTransportException::NOT_OPEN,
"connect() failed " + getSocketInfo(),
errno_copy);
}
try_again:
struct pollfd fds[1];
std::memset(fds, 0, sizeof(fds));
fds[0].fd = socket_;
fds[0].events = POLLOUT;
// When there is a poll timeout set, an EINTR will restart the
// poll() and hence reset the timer. So if we receive EINTRs at a
// faster rate than the timeout value, the timeout will never
// trigger. Therefore, we keep track of the total amount of time
// we've spend in poll(), and if this value exceeds the timeout then
// we stop retrying on EINTR. Note that we might still exceed the
// timeout, but by at most a factor of 2.
pausableTimer.start();
ret = poll(fds, 1, options_.connTimeout);
errno_after_poll =
errno; // gettimeofday, used by PausableTimer, can change errno
pausableTimer.stop();
if (ret > 0) {
// Ensure the socket is connected and that there are no errors set
int val;
socklen_t lon;
lon = sizeof(int);
int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void*)&val, &lon);
if (ret2 == -1) {
GlobalOutput.perror(
"TSocket::open() getsockopt() " + getSocketInfo(), errno_after_poll);
throw TTransportException(
TTransportException::NOT_OPEN, "getsockopt()", errno_after_poll);
}
// no errors on socket, go to town
if (val == 0) {
goto done;
}
GlobalOutput.perror(
"TSocket::open() error on socket (after poll) " + getSocketInfo(), val);
throw TTransportException(
TTransportException::NOT_OPEN, "socket open() error", val);
} else if (ret == 0) {
// socket timed out
string errStr = "TSocket::open() timed out " + getSocketInfo();
GlobalOutput(errStr.c_str());
throw TTransportException(
TTransportException::NOT_OPEN, "open() timed out " + getSocketInfo());
} else {
// If interrupted, try again, but only if we haven't exceeded the
// timeout value yet.
if (errno_after_poll == EINTR) {
if (pausableTimer.hasExceededTimeLimit()) {
GlobalOutput.perror(
"TSocket::open() poll() (EINTRs, then timed out) " +
getSocketInfo(),
errno_after_poll);
throw TTransportException(
TTransportException::NOT_OPEN,
"poll() failed (EINTRs, then timed out)",
errno_after_poll);
} else {
goto try_again;
}
} else { // error on poll() other than EINTR
GlobalOutput.perror(
"TSocket::open() poll() " + getSocketInfo(), errno_after_poll);
throw TTransportException(
TTransportException::NOT_OPEN, "poll() failed", errno_after_poll);
}
}
done:
// Set socket back to normal mode (blocking)
fcntl(socket_, F_SETFL, flags);
if (path_.empty()) {
setCachedAddress(res->ai_addr, res->ai_addrlen);
}
}