void Pair::connect()

in gloo/transport/tcp/pair.cc [198:280]


void Pair::connect(const Address& peer) {
  std::unique_lock<std::mutex> lock(m_);
  int rv;
  socklen_t addrlen;
  throwIfException();

  peer_ = peer;

  const auto& selfAddr = self_.getSockaddr();
  const auto& peerAddr = peer_.getSockaddr();

  // Addresses have to have same family
  if (selfAddr.ss_family != peerAddr.ss_family) {
    GLOO_THROW_INVALID_OPERATION_EXCEPTION("address family mismatch");
  }

  if (selfAddr.ss_family == AF_INET) {
    struct sockaddr_in* sa = (struct sockaddr_in*)&selfAddr;
    struct sockaddr_in* sb = (struct sockaddr_in*)&peerAddr;
    addrlen = sizeof(struct sockaddr_in);
    rv = memcmp(&sa->sin_addr, &sb->sin_addr, sizeof(struct in_addr));
    if (rv == 0) {
      rv = sa->sin_port - sb->sin_port;
    }
  } else if (peerAddr.ss_family == AF_INET6) {
    struct sockaddr_in6* sa = (struct sockaddr_in6*)&selfAddr;
    struct sockaddr_in6* sb = (struct sockaddr_in6*)&peerAddr;
    addrlen = sizeof(struct sockaddr_in6);
    rv = memcmp(&sa->sin6_addr, &sb->sin6_addr, sizeof(struct in6_addr));
    if (rv == 0) {
      rv = sa->sin6_port - sb->sin6_port;
    }
  } else {
    GLOO_THROW_INVALID_OPERATION_EXCEPTION("unknown sa_family");
  }

  if (rv == 0) {
    GLOO_THROW_INVALID_OPERATION_EXCEPTION("cannot connect to self");
  }

  is_client_ = rv > 0;

  // self_ < peer_; we are listening side.
  if (!is_client_) {
    waitUntilConnected(lock, true);
    return;
  }

  // self_ > peer_; we are connecting side.
  // First destroy listening socket.
  device_->unregisterDescriptor(fd_, this);
  ::close(fd_);

  // Create new socket to connect to peer.
  fd_ = socket(peerAddr.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
  if (fd_ == -1) {
    signalAndThrowException(GLOO_ERROR_MSG("socket: ", strerror(errno)));
  }

  // Set SO_REUSEADDR to signal that reuse of the source port is OK.
  int on = 1;
  rv = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
  if (rv == -1) {
    ::close(fd_);
    fd_ = FD_INVALID;
    signalAndThrowException(GLOO_ERROR_MSG("setsockopt: ", strerror(errno)));
  }

  // Connect to peer
  rv = ::connect(fd_, (struct sockaddr*)&peerAddr, addrlen);
  if (rv == -1 && errno != EINPROGRESS) {
    ::close(fd_);
    fd_ = FD_INVALID;
    signalAndThrowException(GLOO_ERROR_MSG("connect: ", strerror(errno)));
  }

  // Register with device so we're called when connection completes.
  changeState(CONNECTING);
  device_->registerDescriptor(fd_, EPOLLIN | EPOLLOUT, this);

  // Wait for connection to complete
  waitUntilConnected(lock, true);
}