in gloo/transport/tcp/pair.cc [198:280]
void Pair::connect(const Address& peer) {
std::unique_lock<std::mutex> lock(m_);
int rv;
socklen_t addrlen;
throwIfException();
peer_ = peer;
const auto& selfAddr = self_.getSockaddr();
const auto& peerAddr = peer_.getSockaddr();
// Addresses have to have same family
if (selfAddr.ss_family != peerAddr.ss_family) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION("address family mismatch");
}
if (selfAddr.ss_family == AF_INET) {
struct sockaddr_in* sa = (struct sockaddr_in*)&selfAddr;
struct sockaddr_in* sb = (struct sockaddr_in*)&peerAddr;
addrlen = sizeof(struct sockaddr_in);
rv = memcmp(&sa->sin_addr, &sb->sin_addr, sizeof(struct in_addr));
if (rv == 0) {
rv = sa->sin_port - sb->sin_port;
}
} else if (peerAddr.ss_family == AF_INET6) {
struct sockaddr_in6* sa = (struct sockaddr_in6*)&selfAddr;
struct sockaddr_in6* sb = (struct sockaddr_in6*)&peerAddr;
addrlen = sizeof(struct sockaddr_in6);
rv = memcmp(&sa->sin6_addr, &sb->sin6_addr, sizeof(struct in6_addr));
if (rv == 0) {
rv = sa->sin6_port - sb->sin6_port;
}
} else {
GLOO_THROW_INVALID_OPERATION_EXCEPTION("unknown sa_family");
}
if (rv == 0) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION("cannot connect to self");
}
is_client_ = rv > 0;
// self_ < peer_; we are listening side.
if (!is_client_) {
waitUntilConnected(lock, true);
return;
}
// self_ > peer_; we are connecting side.
// First destroy listening socket.
device_->unregisterDescriptor(fd_, this);
::close(fd_);
// Create new socket to connect to peer.
fd_ = socket(peerAddr.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (fd_ == -1) {
signalAndThrowException(GLOO_ERROR_MSG("socket: ", strerror(errno)));
}
// Set SO_REUSEADDR to signal that reuse of the source port is OK.
int on = 1;
rv = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (rv == -1) {
::close(fd_);
fd_ = FD_INVALID;
signalAndThrowException(GLOO_ERROR_MSG("setsockopt: ", strerror(errno)));
}
// Connect to peer
rv = ::connect(fd_, (struct sockaddr*)&peerAddr, addrlen);
if (rv == -1 && errno != EINPROGRESS) {
::close(fd_);
fd_ = FD_INVALID;
signalAndThrowException(GLOO_ERROR_MSG("connect: ", strerror(errno)));
}
// Register with device so we're called when connection completes.
changeState(CONNECTING);
device_->registerDescriptor(fd_, EPOLLIN | EPOLLOUT, this);
// Wait for connection to complete
waitUntilConnected(lock, true);
}