in platform/consensus/ordering/pbft/viewchange_manager.cpp [528:598]
void ViewChangeManager::MonitoringViewChangeTimeOut() {
while (!stop_) {
// [DK3] After timer is out, the client will check if the corresponding
// client request has recevied sufficient valid responses
sem_wait(&viewchange_timer_signal_);
vc_mutex_.lock();
bool empty = true;
std::shared_ptr<ViewChangeTimeout> viewchange_timeout;
for (auto& heap : viewchange_timeout_min_heap_) {
if (!heap.second.empty()) {
viewchange_timeout = heap.second.top();
heap.second.pop();
vc_mutex_.unlock();
empty = false;
break;
}
}
if (empty) {
vc_mutex_.unlock();
continue;
}
auto current_time = GetCurrentTime();
if (viewchange_timeout->timeout_time > current_time) {
usleep(viewchange_timeout->timeout_time - current_time);
}
// [DK3] if not enough responses are received, the client broadcasts the
// client request to all replicas
if (viewchange_timeout->type == ViewChangeTimerType::TYPE_NEWVIEW) {
if (status_ == ViewChangeStatus::READY_NEW_VIEW &&
viewchange_timeout->view == system_info_->GetCurrentView()) {
// [DK12] if the replicas cannot receive a newview message in a timely
// manner, they will enter the next view and starts a new round of
// viewchange. SetCurrentViewAndNewPrimary(viewchange_timeout->view +
// 1);
LOG(ERROR) << "It is time to start a new viewchange";
checkpoint_manager_->TimeoutHandler();
}
} else if (viewchange_timeout->type ==
ViewChangeTimerType::TYPE_VIEWCHANGE) {
// [DK9] if the primary cannot get enough viewchange messages before the
// timer is out, then it broadcasts its viewchanges messages and starts
// the timer again.
if (status_ == ViewChangeStatus::READY_VIEW_CHANGE &&
viewchange_timeout->view == system_info_->GetCurrentView()) {
LOG(ERROR) << "It is time to rebroacast viewchange messages";
ChangeStatue(ViewChangeStatus::VIEW_CHANGE_FAIL);
checkpoint_manager_->TimeoutHandler();
}
} else if (viewchange_timeout->type ==
ViewChangeTimerType::TYPE_COMPLAINT) {
// [DK7] if the primary does not broadcast the request in a timely manner,
// the replica starts a viewchange
if (complaining_clients_[viewchange_timeout->proxy_id]
.CountViewChangeTimeout(viewchange_timeout->hash)) {
complaining_clients_[viewchange_timeout->proxy_id]
.EraseViewChangeTimeout(viewchange_timeout->hash);
}
std::lock_guard<std::mutex> lk(status_mutex_);
if (status_ == ViewChangeStatus::NONE &&
viewchange_timeout->view == system_info_->GetCurrentView()) {
if (viewchange_timeout->start_time >=
message_manager_->GetLastCommittedTime(
viewchange_timeout->proxy_id)) {
LOG(ERROR) << "It is time to start a viewchange";
checkpoint_manager_->TimeoutHandler();
assert(status_ == ViewChangeStatus::READY_VIEW_CHANGE);
}
}
}
}
}