in platform/consensus/ordering/pbft/viewchange_manager.cpp [290:378]
int ViewChangeManager::ProcessNewView(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
NewViewMessage new_view_message;
if (!new_view_message.ParseFromString(request->data())) {
LOG(ERROR) << "Parsing new_view_msg failed.";
return -2;
}
LOG(INFO) << "Received NEW-VIEW for view " << new_view_message.view_number();
// Check if new view is from next expected primary
if (new_view_message.view_number() !=
system_info_->GetCurrentView() + view_change_counter_) {
LOG(ERROR) << "View number " << new_view_message.view_number()
<< " is not the same as expected: "
<< system_info_->GetCurrentView() + 1;
return -2;
}
uint64_t min_s = std::numeric_limits<uint64_t>::max(), max_s = 0;
// Verify each view change message as the new primary does.
for (const auto& msg : new_view_message.viewchange_messages()) {
min_s = std::min(min_s, msg.stable_ckpt().seq());
max_s = std::max(max_s, msg.stable_ckpt().seq());
if (!IsValidViewChangeMsg(msg)) {
LOG(ERROR) << "view change message in the new-view message is invalid";
return -2;
}
}
// check the re-calculated request is the same as the one in the request.
auto request_list = GetPrepareMsg(new_view_message, false);
if (request_list.size() !=
static_cast<size_t>(new_view_message.request_size())) {
LOG(ERROR) << "redo request list size not match:" << request_list.size()
<< "- " << new_view_message.request_size();
return -2;
}
std::set<uint64_t> seq_set;
// only check the data.
for (size_t i = 0; i < request_list.size(); ++i) {
if (request_list[i]->data() != new_view_message.request(i).data()) {
LOG(ERROR) << "data not match";
return -2;
}
seq_set.insert(request_list[i]->seq());
}
LOG(ERROR) << "min_s: " << min_s << " max_s: " << max_s;
// Check if all the sequences in the committed list exist.
for (uint64_t i = min_s + 1; i <= max_s; i++) {
if (seq_set.find(i) == seq_set.end()) {
LOG(ERROR) << "Committed msg :" << i << " does exist";
return -2;
}
}
uint64_t max_seq = seq_set.empty() ? max_s : *(--seq_set.end());
SetCurrentViewAndNewPrimary(new_view_message.view_number());
message_manager_->SetNextSeq(max_seq + 1);
LOG(INFO) << "SetNexSeq: " << max_seq + 1;
// All is fine.
for (size_t i = 0; i < request_list.size(); ++i) {
if (new_view_message.request(i).type() ==
static_cast<int>(Request::TYPE_PRE_PREPARE)) {
new_view_message.request(i);
auto non_proposed_hashes =
collector_pool_->GetCollector(new_view_message.request(i).seq())
->GetAllStoredHash();
for (auto& hash : non_proposed_hashes) {
duplicate_manager_->EraseProposed(hash);
}
replica_communicator_->SendMessage(new_view_message.request(i),
config_.GetSelfInfo());
} else {
if (new_view_message.request(i).seq() >
checkpoint_manager_->GetHighestPreparedSeq()) {
checkpoint_manager_->SetHighestPreparedSeq(
new_view_message.request(i).seq());
}
replica_communicator_->BroadCast(new_view_message.request(i));
}
}
ChangeStatue(ViewChangeStatus::NONE);
return config_.GetSelfInfo().id() == system_info_->GetPrimaryId() ? -4 : 0;
}