in src/io/network/endpoint.cc [562:650]
int NetworkThread::asyncSend(int fd) {
// EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
CHECK(fd_ep_map_.count(fd) > 0);
EndPoint *ep = fd_ep_map_.at(fd);
std::unique_lock<std::mutex> ep_lock(ep->mtx_);
if (fd != ep->pfd_)
// we only send over the primary fd
// return -1 to indicate this fd is redundant
return ep->is_socket_loop_ ? 0 : -1;
if (ep->conn_status_ != CONN_EST)
// This happens during reconnection
goto out;
while (!ep->send_.empty()) {
Message &msg = *ep->send_.front();
int nbytes;
while (msg.processed_ < msg.getSize()) {
if (msg.type_ == MSG_ACK) {
nbytes = write(fd, msg.mdata_ + msg.processed_,
msg.getSize() - msg.processed_);
} else
nbytes = write(fd, msg.msg_ + msg.processed_,
msg.getSize() - msg.processed_);
if (nbytes == -1) {
if (errno == EWOULDBLOCK) {
if (!ev_is_active(&fd_wwatcher_map_[fd]) &&
!ev_is_pending(&fd_wwatcher_map_[fd]))
ev_io_start(loop_, &fd_wwatcher_map_[fd]);
goto out;
} else {
// this connection is lost; reset the send status
// so that next time the whole msg would be sent entirely
msg.processed_ = 0;
goto err;
}
} else {
ep->last_msg_time_ = ev_now(loop_);
msg.processed_ += nbytes;
}
// std::size_t m, p;
// uint8_t type;
// uint32_t id;
// if (msg.msg_) {
// readInteger(msg.msg_, type, id, m, p);
// LOG(INFO) << "Send " << msg.processed_ << " bytes to " <<
// inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd << " for the current
// DATA MSG " << msg.id_ << ", " << id << ", " << m << ", " << p;
//}
}
CHECK(msg.processed_ == msg.getSize());
if (msg.type_ != MSG_ACK) {
LOG(INFO) << "Send a DATA message to " << inet_ntoa(ep->addr_.sin_addr)
<< " for MSG " << msg.id_ << ", len = " << msg.getSize()
<< " over fd " << fd;
msg.processed_ = 0;
ep->to_ack_.push(&msg);
} else {
// LOG(INFO) << "Send an ACK message to " << inet_ntoa(ep->addr_.sin_addr)
// << " for MSG " << msg.id_;
delete &msg;
}
ep->send_.pop();
// for test
// if (ep->retry_cnt_ == 0) {
// LOG(INFO) << "Disconnect with Endpoint " <<
// inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
// close(fd);
// goto err;
// }
}
out:
if (ep->send_.empty())
ev_io_stop(loop_, &this->fd_wwatcher_map_[fd]);
return 0;
err:
return -1;
}