in src/io/network/endpoint.cc [652:766]
void NetworkThread::onRecv(int fd) {
Message *m = &pending_msgs_[fd];
Message &msg = (*m);
int nread;
// EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
CHECK(fd_ep_map_.count(fd) > 0);
EndPoint *ep = fd_ep_map_.at(fd);
// LOG(INFO) << "Start to read from EndPoint " <<
// inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
std::unique_lock<std::mutex> lock(ep->mtx_);
ep->last_msg_time_ = ev_now(loop_);
while (1) {
if (msg.processed_ < Message::hsize_) {
nread = read(fd, msg.mdata_ + msg.processed_,
Message::hsize_ - msg.processed_);
if (nread <= 0) {
if (errno != EWOULDBLOCK || nread == 0) {
// socket error or shuts down
if (nread < 0)
LOG(INFO) << "Fail to receive from EndPoint "
<< inet_ntoa(ep->addr_.sin_addr) << ": "
<< strerror(errno);
else
LOG(INFO) << "Fail to receive from EndPoint "
<< inet_ntoa(ep->addr_.sin_addr)
<< ": Connection reset by remote side";
handleConnLost(fd, ep);
}
break;
}
msg.processed_ += nread;
while (msg.processed_ >= sizeof(msg.type_) + sizeof(msg.id_)) {
readInteger(msg.mdata_, msg.type_, msg.id_);
if (msg.type_ == MSG_ACK) {
LOG(INFO) << "Receive an ACK message from "
<< inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_;
while (!ep->to_ack_.empty()) {
Message *m = ep->to_ack_.front();
if (m->id_ <= msg.id_) {
delete m;
ep->to_ack_.pop();
} else {
break;
}
}
// reset
msg.processed_ -= sizeof(msg.type_) + sizeof(msg.id_);
memmove(msg.mdata_, msg.mdata_ + sizeof(msg.type_) + sizeof(msg.id_),
msg.processed_);
} else
break;
}
if (msg.processed_ < Message::hsize_) {
continue;
}
// got the whole metadata;
readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, msg.psize_);
LOG(INFO) << "Receive a message: id = " << msg.id_
<< ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_
<< " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd "
<< fd;
}
// start reading the real data
if (msg.msg_ == nullptr) {
msg.msg_ = new char[msg.getSize()];
memcpy(msg.msg_, msg.mdata_, Message::hsize_);
}
nread = read(fd, msg.msg_ + msg.processed_, msg.getSize() - msg.processed_);
if (nread <= 0) {
if (errno != EWOULDBLOCK || nread == 0) {
// socket error or shuts down
if (nread < 0)
LOG(INFO) << "Fail to receive from EndPoint "
<< inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno);
else
LOG(INFO) << "Fail to receive from EndPoint "
<< inet_ntoa(ep->addr_.sin_addr)
<< ": Connection reset by remote side";
handleConnLost(fd, ep);
}
break;
}
msg.processed_ += nread;
// LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " <<
// msg.msize_ << ", psize_ = " << msg.psize_ << ", processed_ = " <<
// msg.processed_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd
// " << fd;
if (msg.processed_ == msg.getSize()) {
LOG(INFO) << "Receive a " << msg.processed_ << " bytes DATA message from "
<< inet_ntoa(ep->addr_.sin_addr) << " with id " << msg.id_;
ep->recv_.push(new Message(static_cast<Message &&>(msg)));
// notify of waiting thread
ep->cv_.notify_one();
ep->send_.push(new Message(MSG_ACK, msg.id_));
msg.processed_ = 0;
}
}
}