in src/io/network/endpoint.cc [421:510]
void NetworkThread::afterConnEst(EndPoint *ep, int fd, bool active) {
if (active)
LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr)
<< ", fd = " << fd;
int sfd;
if (active) {
ep->fd_[0] = fd;
sfd = ep->fd_[1];
} else {
if (ep->fd_[1] >= 0) {
// the previous connection is lost
handleConnLost(ep->fd_[1], ep, false);
}
ep->fd_[1] = fd;
sfd = ep->fd_[0];
}
if (sfd == fd) {
// this fd is a reuse of a previous socket fd
// so we first need to clean the resouce for that fd
// we duplicate this fd to let the resouce of the oldf fd can be freed
// also indicate there is no need to reconnect
fd = dup(fd);
handleConnLost(sfd, ep, false);
}
// initialize io watchers and add the read watcher to the ev loop
ev_io_init(&fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
ev_io_start(loop_, &fd_rwatcher_map_[fd]);
// stop watching the writable watcher if necessary
if (active)
ev_io_stop(loop_, &fd_wwatcher_map_[fd]);
ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
ep->last_msg_time_ = ev_now(loop_);
// see whether there is already a established connection for this fd
if (ep->conn_status_ == CONN_EST && sfd >= 0) {
// check if fd and sfd are associate with the same socket
struct sockaddr_in addr;
socklen_t len;
if (getsockname(fd, (struct sockaddr *)&addr, &len)) {
LOG(INFO) << "Unable to get local socket address: " << strerror(errno);
} else {
// see whether the local address of fd is the same as the remote side
// of sfd, which has already been stored in ep->addr_
if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr &&
addr.sin_port == ep->addr_.sin_port) {
LOG(INFO) << fd << " and " << sfd
<< " are associated with the same socket";
ep->is_socket_loop_ = true;
} else {
// this socket is redundant, we close it maunally if the local ip
// is smaller than the peer ip
if ((addr.sin_addr.s_addr < ep->addr_.sin_addr.s_addr) ||
(addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr &&
addr.sin_port < ep->addr_.sin_port))
handleConnLost(fd, ep, false);
}
}
} else {
ep->pfd_ = fd; // set the primary fd
ep->conn_status_ = CONN_EST;
// start timeout watcher to detect the liveness of EndPoint
ev_init(&ep->timer_, timeout_cb);
ep->timer_.repeat = EP_TIMEOUT;
ev_timer_start(loop_, &ep->timer_);
// timeout_cb(loop_, &ep->timer_, EV_TIMER);
}
if (fd == ep->pfd_) {
this->asyncSendPendingMsg(ep);
}
fd_ep_map_[fd] = ep;
// Finally notify all waiting threads
// if this connection is initiaed by remote side,
// we dont need to notify the waiting thread
// later threads wanting to send to this ep, however,
// are able to reuse this ep
if (active) {
ep->cv_.notify_all();
}
}