int NetworkThread::asyncSend()

in src/io/network/endpoint.cc [562:650]


int NetworkThread::asyncSend(int fd) {

  // EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
  CHECK(fd_ep_map_.count(fd) > 0);
  EndPoint *ep = fd_ep_map_.at(fd);

  std::unique_lock<std::mutex> ep_lock(ep->mtx_);

  if (fd != ep->pfd_)
    // we only send over the primary fd
    // return -1 to indicate this fd is redundant
    return ep->is_socket_loop_ ? 0 : -1;

  if (ep->conn_status_ != CONN_EST)
    // This happens during reconnection
    goto out;

  while (!ep->send_.empty()) {

    Message &msg = *ep->send_.front();
    int nbytes;

    while (msg.processed_ < msg.getSize()) {
      if (msg.type_ == MSG_ACK) {
        nbytes = write(fd, msg.mdata_ + msg.processed_,
                       msg.getSize() - msg.processed_);
      } else
        nbytes = write(fd, msg.msg_ + msg.processed_,
                       msg.getSize() - msg.processed_);

      if (nbytes == -1) {
        if (errno == EWOULDBLOCK) {
          if (!ev_is_active(&fd_wwatcher_map_[fd]) &&
              !ev_is_pending(&fd_wwatcher_map_[fd]))
            ev_io_start(loop_, &fd_wwatcher_map_[fd]);
          goto out;
        } else {
          // this connection is lost; reset the send status
          // so that next time the whole msg would be sent entirely
          msg.processed_ = 0;
          goto err;
        }
      } else {
        ep->last_msg_time_ = ev_now(loop_);
        msg.processed_ += nbytes;
      }

      // std::size_t m, p;
      // uint8_t type;
      // uint32_t id;
      // if (msg.msg_) {
      //    readInteger(msg.msg_, type, id, m, p);
      //    LOG(INFO) << "Send " << msg.processed_ << " bytes to " <<
      // inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd << " for the current
      // DATA MSG " << msg.id_ << ", " << id << ", " << m << ", " << p;
      //}
    }

    CHECK(msg.processed_ == msg.getSize());

    if (msg.type_ != MSG_ACK) {
      LOG(INFO) << "Send a DATA message to " << inet_ntoa(ep->addr_.sin_addr)
                << " for MSG " << msg.id_ << ", len = " << msg.getSize()
                << " over fd " << fd;
      msg.processed_ = 0;
      ep->to_ack_.push(&msg);
    } else {
      // LOG(INFO) << "Send an ACK message to " << inet_ntoa(ep->addr_.sin_addr)
      // << " for MSG " << msg.id_;
      delete &msg;
    }

    ep->send_.pop();

    // for test
    // if (ep->retry_cnt_ == 0) {
    //     LOG(INFO) << "Disconnect with Endpoint " <<
    // inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
    //     close(fd);
    //     goto err;
    // }
  }
out:
  if (ep->send_.empty())
    ev_io_stop(loop_, &this->fd_wwatcher_map_[fd]);
  return 0;
err:
  return -1;
}