void NetworkThread::afterConnEst()

in src/io/network/endpoint.cc [421:510]


void NetworkThread::afterConnEst(EndPoint *ep, int fd, bool active) {

  if (active)
    LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr)
              << ", fd = " << fd;

  int sfd;

  if (active) {
    ep->fd_[0] = fd;
    sfd = ep->fd_[1];
  } else {
    if (ep->fd_[1] >= 0) {
      // the previous connection is lost
      handleConnLost(ep->fd_[1], ep, false);
    }
    ep->fd_[1] = fd;
    sfd = ep->fd_[0];
  }

  if (sfd == fd) {
    // this fd is a reuse of a previous socket fd
    // so we first need to clean the resouce for that fd
    // we duplicate this fd to let the resouce of the oldf fd can be freed
    // also indicate there is no need to reconnect
    fd = dup(fd);
    handleConnLost(sfd, ep, false);
  }

  // initialize io watchers and add the read watcher to the ev loop
  ev_io_init(&fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
  ev_io_start(loop_, &fd_rwatcher_map_[fd]);

  // stop watching the writable watcher if necessary
  if (active)
    ev_io_stop(loop_, &fd_wwatcher_map_[fd]);
  ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);

  ep->last_msg_time_ = ev_now(loop_);

  // see whether there is already a established connection for this fd
  if (ep->conn_status_ == CONN_EST && sfd >= 0) {
    // check if fd and sfd are associate with the same socket
    struct sockaddr_in addr;
    socklen_t len;
    if (getsockname(fd, (struct sockaddr *)&addr, &len)) {
      LOG(INFO) << "Unable to get local socket address: " << strerror(errno);
    } else {
      // see whether the local address of fd is the same as the remote side
      // of sfd, which has already been stored in ep->addr_
      if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr &&
          addr.sin_port == ep->addr_.sin_port) {
        LOG(INFO) << fd << " and " << sfd
                  << " are associated with the same socket";
        ep->is_socket_loop_ = true;
      } else {
        // this socket is redundant, we close it maunally if the local ip
        // is smaller than the peer ip
        if ((addr.sin_addr.s_addr < ep->addr_.sin_addr.s_addr) ||
            (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr &&
             addr.sin_port < ep->addr_.sin_port))
          handleConnLost(fd, ep, false);
      }
    }
  } else {
    ep->pfd_ = fd; // set the primary fd
    ep->conn_status_ = CONN_EST;

    // start timeout watcher to detect the liveness of EndPoint
    ev_init(&ep->timer_, timeout_cb);
    ep->timer_.repeat = EP_TIMEOUT;
    ev_timer_start(loop_, &ep->timer_);
    // timeout_cb(loop_, &ep->timer_, EV_TIMER);
  }

  if (fd == ep->pfd_) {
    this->asyncSendPendingMsg(ep);
  }

  fd_ep_map_[fd] = ep;

  // Finally notify all waiting threads
  // if this connection is initiaed by remote side,
  // we dont need to notify the waiting thread
  // later threads wanting to send to this ep, however,
  // are able to reuse this ep
  if (active) {
    ep->cv_.notify_all();
  }
}