void NetworkThread::onRecv()

in src/io/network/endpoint.cc [652:766]


void NetworkThread::onRecv(int fd) {

  Message *m = &pending_msgs_[fd];
  Message &msg = (*m);
  int nread;
  // EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);

  CHECK(fd_ep_map_.count(fd) > 0);
  EndPoint *ep = fd_ep_map_.at(fd);

  // LOG(INFO) << "Start to read from EndPoint " <<
  // inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;

  std::unique_lock<std::mutex> lock(ep->mtx_);

  ep->last_msg_time_ = ev_now(loop_);
  while (1) {
    if (msg.processed_ < Message::hsize_) {
      nread = read(fd, msg.mdata_ + msg.processed_,
                   Message::hsize_ - msg.processed_);

      if (nread <= 0) {
        if (errno != EWOULDBLOCK || nread == 0) {
          // socket error or shuts down
          if (nread < 0)
            LOG(INFO) << "Fail to receive from EndPoint "
                      << inet_ntoa(ep->addr_.sin_addr) << ": "
                      << strerror(errno);
          else
            LOG(INFO) << "Fail to receive from EndPoint "
                      << inet_ntoa(ep->addr_.sin_addr)
                      << ": Connection reset by remote side";
          handleConnLost(fd, ep);
        }
        break;
      }

      msg.processed_ += nread;
      while (msg.processed_ >= sizeof(msg.type_) + sizeof(msg.id_)) {
        readInteger(msg.mdata_, msg.type_, msg.id_);
        if (msg.type_ == MSG_ACK) {
          LOG(INFO) << "Receive an ACK message from "
                    << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_;
          while (!ep->to_ack_.empty()) {
            Message *m = ep->to_ack_.front();
            if (m->id_ <= msg.id_) {
              delete m;
              ep->to_ack_.pop();
            } else {
              break;
            }
          }

          // reset
          msg.processed_ -= sizeof(msg.type_) + sizeof(msg.id_);
          memmove(msg.mdata_, msg.mdata_ + sizeof(msg.type_) + sizeof(msg.id_),
                  msg.processed_);

        } else
          break;
      }

      if (msg.processed_ < Message::hsize_) {
        continue;
      }

      // got the whole metadata;
      readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, msg.psize_);

      LOG(INFO) << "Receive a message: id = " << msg.id_
                << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_
                << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd "
                << fd;
    }

    // start reading the real data
    if (msg.msg_ == nullptr) {
      msg.msg_ = new char[msg.getSize()];
      memcpy(msg.msg_, msg.mdata_, Message::hsize_);
    }

    nread = read(fd, msg.msg_ + msg.processed_, msg.getSize() - msg.processed_);
    if (nread <= 0) {
      if (errno != EWOULDBLOCK || nread == 0) {
        // socket error or shuts down
        if (nread < 0)
          LOG(INFO) << "Fail to receive from EndPoint "
                    << inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno);
        else
          LOG(INFO) << "Fail to receive from EndPoint "
                    << inet_ntoa(ep->addr_.sin_addr)
                    << ": Connection reset by remote side";
        handleConnLost(fd, ep);
      }
      break;
    }

    msg.processed_ += nread;

    // LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " <<
    // msg.msize_ << ", psize_ = " << msg.psize_ << ", processed_ = " <<
    // msg.processed_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd
    // " << fd;

    if (msg.processed_ == msg.getSize()) {
      LOG(INFO) << "Receive a " << msg.processed_ << " bytes DATA message from "
                << inet_ntoa(ep->addr_.sin_addr) << " with id " << msg.id_;
      ep->recv_.push(new Message(static_cast<Message &&>(msg)));
      // notify of waiting thread
      ep->cv_.notify_one();
      ep->send_.push(new Message(MSG_ACK, msg.id_));
      msg.processed_ = 0;
    }
  }
}