platform/consensus/ordering/pbft/viewchange_manager.cpp (487 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/consensus/ordering/pbft/viewchange_manager.h"
#include <glog/logging.h>
#include "common/utils/utils.h"
#include "platform/consensus/ordering/pbft/transaction_utils.h"
#include "platform/proto/viewchange_message.pb.h"
namespace resdb {
ComplaningClients::ComplaningClients()
: proxy_id(0), is_complaining(false), timeout_length_(10000000) {}
ComplaningClients::ComplaningClients(uint64_t proxy_id)
: proxy_id(proxy_id), is_complaining(false), timeout_length_(10000000) {}
std::shared_ptr<ViewChangeTimeout> ComplaningClients::SetComplaining(
std::string hash, uint64_t view) {
this->complain_state_lock.lock();
this->is_complaining = true;
auto info = std::make_shared<ViewChangeTimeout>(
ViewChangeTimerType::TYPE_COMPLAINT, view, this->proxy_id, hash,
GetCurrentTime(), this->timeout_length_);
this->viewchange_timeout_set.insert(hash);
this->complain_state_lock.unlock();
return info;
}
uint ComplaningClients::CountViewChangeTimeout(std::string hash) {
this->complain_state_lock.lock();
uint value = this->viewchange_timeout_set.count(hash);
this->complain_state_lock.unlock();
return value;
}
void ComplaningClients::EraseViewChangeTimeout(std::string hash) {
this->complain_state_lock.lock();
this->viewchange_timeout_set.erase(hash);
this->complain_state_lock.unlock();
}
void ComplaningClients::ReleaseComplaining(std::string hash) {
this->complain_state_lock.lock();
this->viewchange_timeout_set.erase(hash);
this->complain_state_lock.unlock();
}
// A manager to address View change process.
// All stuff here will be addressed in sequential by using mutex
// to make things simplier.
ViewChangeManager::ViewChangeManager(const ResDBConfig& config,
CheckPointManager* checkpoint_manager,
MessageManager* message_manager,
SystemInfo* system_info,
ReplicaCommunicator* replica_communicator,
SignatureVerifier* verifier)
: config_(config),
checkpoint_manager_(checkpoint_manager),
message_manager_(message_manager),
system_info_(system_info),
replica_communicator_(replica_communicator),
verifier_(verifier),
status_(ViewChangeStatus::NONE),
started_(false),
stop_(false) {
view_change_counter_ = 1;
global_stats_ = Stats::GetGlobalStats();
if (config_.GetConfigData().enable_viewchange()) {
collector_pool_ = message_manager->GetCollectorPool();
sem_init(&viewchange_timer_signal_, 0, 0);
server_checking_timeout_thread_ =
std::thread(&ViewChangeManager::MonitoringViewChangeTimeOut, this);
checkpoint_state_thread_ =
std::thread(&ViewChangeManager::MonitoringCheckpointState, this);
}
}
ViewChangeManager::~ViewChangeManager() {
checkpoint_manager_->Stop();
if (server_checking_timeout_thread_.joinable()) {
server_checking_timeout_thread_.join();
}
if (checkpoint_state_thread_.joinable()) {
checkpoint_state_thread_.join();
}
}
void ViewChangeManager::MayStart() {
if (started_) {
return;
}
started_ = true;
LOG(ERROR) << "MAYSTART";
if (config_.GetPublicKeyCertificateInfo()
.public_key()
.public_key_info()
.type() == CertificateKeyInfo::CLIENT) {
LOG(ERROR) << "client type not process view change";
return;
}
checkpoint_manager_->SetTimeoutHandler([&]() {
// LOG(ERROR) << "checkpoint timeout";
if (status_ == ViewChangeStatus::NONE) {
view_change_counter_ = 1;
} else if (status_ == ViewChangeStatus::READY_NEW_VIEW) {
// If the new view msg expires after receiving enough view
// messages, trigger a new primary.
view_change_counter_++;
}
// std::lock_guard<std::mutex> lk(status_mutex_);
if (ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE)) {
SendViewChangeMsg();
auto viewchange_timer = std::make_shared<ViewChangeTimeout>(
ViewChangeTimerType::TYPE_VIEWCHANGE, system_info_->GetCurrentView(),
config_.GetSelfInfo().id(), "null", GetCurrentTime(),
timeout_length_);
std::lock_guard<std::mutex> lk(vc_mutex_);
if (viewchange_timeout_min_heap_[config_.GetSelfInfo().id()].size() <
config_.GetMaxClientComplaintNum()) {
viewchange_timeout_min_heap_[config_.GetSelfInfo().id()].push(
viewchange_timer);
sem_post(&viewchange_timer_signal_);
}
}
});
}
bool ViewChangeManager::ChangeStatue(ViewChangeStatus status) {
if (status == ViewChangeStatus::READY_VIEW_CHANGE) {
if (status_ != ViewChangeStatus::READY_VIEW_CHANGE) {
LOG(ERROR) << "CHANGE STATUS";
status_ = status;
}
} else {
status_ = status;
}
return status_ == status;
}
bool ViewChangeManager::IsInViewChange() {
return status_ != ViewChangeStatus::NONE;
}
bool ViewChangeManager::IsValidViewChangeMsg(
const ViewChangeMessage& view_change_message) {
if (view_change_message.view_number() <= system_info_->GetCurrentView()) {
LOG(ERROR) << "View number is not greater than current view:"
<< view_change_message.view_number() << "("
<< system_info_->GetCurrentView() << ")";
return false;
}
if (!checkpoint_manager_->IsValidCheckpointProof(
view_change_message.stable_ckpt())) {
LOG(ERROR) << "stable checkpoint invalid";
return false;
}
uint64_t stable_seq = view_change_message.stable_ckpt().seq();
for (const auto& prepared_msg : view_change_message.prepared_msg()) {
if (prepared_msg.seq() <= stable_seq) {
continue;
}
// If there is less than 2f+1 proof, reject.
if (prepared_msg.proof_size() < config_.GetMinDataReceiveNum()) {
LOG(ERROR) << "proof[" << prepared_msg.proof_size()
<< "] not enough:" << config_.GetMinDataReceiveNum();
return false;
}
for (const auto& proof : prepared_msg.proof()) {
if (proof.request().seq() != prepared_msg.seq()) {
LOG(ERROR) << "proof seq not match";
return false;
}
std::string data;
proof.request().SerializeToString(&data);
if (!verifier_->VerifyMessage(data, proof.signature())) {
LOG(ERROR) << "proof signature not valid";
return false;
}
}
}
return true;
}
uint32_t ViewChangeManager::AddRequest(
const ViewChangeMessage& viewchange_message, uint32_t sender) {
std::lock_guard<std::mutex> lk(vc_mutex_);
viewchange_request_[viewchange_message.view_number()][sender] =
viewchange_message;
return viewchange_request_[viewchange_message.view_number()].size();
}
bool ViewChangeManager::IsNextPrimary(uint64_t view_number) {
std::lock_guard<std::mutex> lk(mutex_);
const std::vector<ReplicaInfo>& replicas = config_.GetReplicaInfos();
return config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id() ==
config_.GetSelfInfo().id();
}
void ViewChangeManager::SetCurrentViewAndNewPrimary(uint64_t view_number) {
system_info_->SetCurrentView(view_number);
const std::vector<ReplicaInfo>& replicas = config_.GetReplicaInfos();
uint32_t id =
config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id();
system_info_->SetPrimary(id);
global_stats_->ChangePrimary(id);
LOG(ERROR) << "View Change Happened";
}
std::vector<std::unique_ptr<Request>> ViewChangeManager::GetPrepareMsg(
const NewViewMessage& new_view_message, bool need_sign) {
std::map<uint64_t, Request> prepared_msg; // <sequence, digest>
for (const auto& msg : new_view_message.viewchange_messages()) {
for (const auto& msg : msg.prepared_msg()) {
uint64_t seq = msg.seq();
prepared_msg[seq] = msg.proof(0).request();
}
}
uint64_t min_s = std::numeric_limits<uint64_t>::max();
for (const auto& msg : new_view_message.viewchange_messages()) {
min_s = std::min(min_s, msg.stable_ckpt().seq());
}
uint64_t max_seq = 1;
if (prepared_msg.size() > 0) {
max_seq = (--prepared_msg.end())->first;
}
LOG(INFO) << "[GP] min_s: " << min_s << " max_seq: " << max_seq;
std::vector<std::unique_ptr<Request>> redo_request;
// Resent all the request with the current view number.
for (auto i = min_s + 1; i <= max_seq; ++i) {
if (prepared_msg.find(i) == prepared_msg.end()) {
// for sequence hole, create a new request with empty data and
// sign by the new primary.
std::unique_ptr<Request> user_request = resdb::NewRequest(
Request::TYPE_PRE_PREPARE, Request(), config_.GetSelfInfo().id());
user_request->set_seq(i);
user_request->set_current_view(new_view_message.view_number());
user_request->set_hash("null" + std::to_string(i));
if (verifier_ && need_sign) {
std::string data;
auto signature_or = verifier_->SignMessage(data);
if (!signature_or.ok()) {
LOG(ERROR) << "Sign message fail";
continue;
}
*user_request->mutable_data_signature() = *signature_or;
}
redo_request.push_back(std::move(user_request));
} else {
std::unique_ptr<Request> commit_request = resdb::NewRequest(
Request::TYPE_COMMIT, prepared_msg[i], config_.GetSelfInfo().id());
commit_request->set_seq(i);
commit_request->set_current_view(new_view_message.view_number());
redo_request.push_back(std::move(commit_request));
}
}
return redo_request;
}
int ViewChangeManager::ProcessNewView(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
NewViewMessage new_view_message;
if (!new_view_message.ParseFromString(request->data())) {
LOG(ERROR) << "Parsing new_view_msg failed.";
return -2;
}
LOG(INFO) << "Received NEW-VIEW for view " << new_view_message.view_number();
// Check if new view is from next expected primary
if (new_view_message.view_number() !=
system_info_->GetCurrentView() + view_change_counter_) {
LOG(ERROR) << "View number " << new_view_message.view_number()
<< " is not the same as expected: "
<< system_info_->GetCurrentView() + 1;
return -2;
}
uint64_t min_s = std::numeric_limits<uint64_t>::max(), max_s = 0;
// Verify each view change message as the new primary does.
for (const auto& msg : new_view_message.viewchange_messages()) {
min_s = std::min(min_s, msg.stable_ckpt().seq());
max_s = std::max(max_s, msg.stable_ckpt().seq());
if (!IsValidViewChangeMsg(msg)) {
LOG(ERROR) << "view change message in the new-view message is invalid";
return -2;
}
}
// check the re-calculated request is the same as the one in the request.
auto request_list = GetPrepareMsg(new_view_message, false);
if (request_list.size() !=
static_cast<size_t>(new_view_message.request_size())) {
LOG(ERROR) << "redo request list size not match:" << request_list.size()
<< "- " << new_view_message.request_size();
return -2;
}
std::set<uint64_t> seq_set;
// only check the data.
for (size_t i = 0; i < request_list.size(); ++i) {
if (request_list[i]->data() != new_view_message.request(i).data()) {
LOG(ERROR) << "data not match";
return -2;
}
seq_set.insert(request_list[i]->seq());
}
LOG(ERROR) << "min_s: " << min_s << " max_s: " << max_s;
// Check if all the sequences in the committed list exist.
for (uint64_t i = min_s + 1; i <= max_s; i++) {
if (seq_set.find(i) == seq_set.end()) {
LOG(ERROR) << "Committed msg :" << i << " does exist";
return -2;
}
}
uint64_t max_seq = seq_set.empty() ? max_s : *(--seq_set.end());
SetCurrentViewAndNewPrimary(new_view_message.view_number());
message_manager_->SetNextSeq(max_seq + 1);
LOG(INFO) << "SetNexSeq: " << max_seq + 1;
// All is fine.
for (size_t i = 0; i < request_list.size(); ++i) {
if (new_view_message.request(i).type() ==
static_cast<int>(Request::TYPE_PRE_PREPARE)) {
new_view_message.request(i);
auto non_proposed_hashes =
collector_pool_->GetCollector(new_view_message.request(i).seq())
->GetAllStoredHash();
for (auto& hash : non_proposed_hashes) {
duplicate_manager_->EraseProposed(hash);
}
replica_communicator_->SendMessage(new_view_message.request(i),
config_.GetSelfInfo());
} else {
if (new_view_message.request(i).seq() >
checkpoint_manager_->GetHighestPreparedSeq()) {
checkpoint_manager_->SetHighestPreparedSeq(
new_view_message.request(i).seq());
}
replica_communicator_->BroadCast(new_view_message.request(i));
}
}
ChangeStatue(ViewChangeStatus::NONE);
return config_.GetSelfInfo().id() == system_info_->GetPrimaryId() ? -4 : 0;
}
int ViewChangeManager::ProcessViewChange(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
ViewChangeMessage viewchange_message;
if (!viewchange_message.ParseFromString(request->data())) {
LOG(ERROR) << "pase view change data fail";
return -2;
}
if (!IsValidViewChangeMsg(viewchange_message)) {
LOG(ERROR) << "view change msg not valid from:" << request->sender_id();
return -2;
}
LOG(ERROR) << "ViewChange message from " << request->sender_id();
size_t request_size = AddRequest(viewchange_message, request->sender_id());
if (request_size >= config_.GetMinDataReceiveNum()) {
// process new view
if (IsNextPrimary(viewchange_message.view_number())) {
std::lock_guard<std::mutex> lk(mutex_);
SendNewViewMsg(viewchange_message.view_number());
} else {
auto newview_timer = std::make_shared<ViewChangeTimeout>(
ViewChangeTimerType::TYPE_NEWVIEW, system_info_->GetCurrentView(),
config_.GetSelfInfo().id(), "null", GetCurrentTime(),
timeout_length_);
std::lock_guard<std::mutex> lk(vc_mutex_);
if (viewchange_timeout_min_heap_[config_.GetSelfInfo().id()].size() <
config_.GetMaxClientComplaintNum()) {
viewchange_timeout_min_heap_[config_.GetSelfInfo().id()].push(
newview_timer);
sem_post(&viewchange_timer_signal_);
}
}
ChangeStatue(ViewChangeStatus::READY_NEW_VIEW);
}
return 0;
}
void ViewChangeManager::SendNewViewMsg(uint64_t view_number) {
if (new_view_is_sent_) {
return;
}
new_view_is_sent_ = true;
// PBFT Paper - Primary determines the sequence number min-s of the latest
// stable checkpoint in V and the highest sequence number max-s in a prepare
// message in V
// uint64_t min_s = std::numeric_limits<uint64_t>::max();
std::lock_guard<std::mutex> lk(vc_mutex_);
auto requests = viewchange_request_[view_number];
NewViewMessage new_view_message;
new_view_message.set_view_number(view_number);
std::map<uint64_t, std::string> new_view_request; // <sequence, digest>
for (auto& it : requests) {
ViewChangeMessage& msg = it.second;
LOG(ERROR) << "msg.view_number(): " << msg.view_number()
<< " view_number: " << view_number << " sender: " << it.first
<< " msg.stable_ckpt(): " << msg.stable_ckpt().seq();
msg.set_view_number(view_number);
*new_view_message.add_viewchange_messages() = msg;
}
// Get the redo message from the primary. This could not be done
// in each replica because the primary has to signed some empty
// request if needed.
std::vector<std::unique_ptr<Request>> request_list =
GetPrepareMsg(new_view_message);
for (const auto& request : request_list) {
*new_view_message.add_request() = *request;
}
// Broadcast my new view request.
std::unique_ptr<Request> request =
NewRequest(Request::TYPE_NEWVIEW, Request(), config_.GetSelfInfo().id());
new_view_message.SerializeToString(request->mutable_data());
replica_communicator_->BroadCast(*request);
}
void ViewChangeManager::SendViewChangeMsg() {
// PBFT Paper - <VIEW-CHANGE, v + x, n, C, P)
ViewChangeMessage view_change_message;
// v + x (view number of the next expected primary)
view_change_message.set_view_number(system_info_->GetCurrentView() +
view_change_counter_);
LOG(ERROR) << "current view: " << system_info_->GetCurrentView()
<< " view number: " << view_change_message.view_number()
<< " view_change_counter_ " << view_change_counter_;
assert(view_change_message.view_number() ==
system_info_->GetCurrentView() + view_change_counter_);
// n (sequence number of the latest checkpoint) and C (proof for the stable
// checkpoint)
*view_change_message.mutable_stable_ckpt() =
checkpoint_manager_->GetStableCheckpointWithVotes();
// P - P is a set containing a set Pm for each request m that prepared at i
// with a sequence number higher than n.
int max_seq = checkpoint_manager_->GetHighestPreparedSeq();
LOG(INFO) << "Check prepared or committed txns from "
<< view_change_message.stable_ckpt().seq() + 1 << " to " << max_seq;
for (int i = view_change_message.stable_ckpt().seq() + 1; i <= max_seq; ++i) {
// seq i has been prepared or committed.
if (message_manager_->GetTransactionState(i) >=
TransactionStatue::READY_COMMIT) {
std::vector<RequestInfo> proof_info =
message_manager_->GetPreparedProof(i);
assert(proof_info.size() >= config_.GetMinDataReceiveNum());
auto txn = view_change_message.add_prepared_msg();
txn->set_seq(i);
for (const auto& info : proof_info) {
auto proof = txn->add_proof();
*proof->mutable_request() = *info.request;
*proof->mutable_signature() = info.signature;
}
}
}
// Broadcast my view change request.
std::unique_ptr<Request> request = NewRequest(
Request::TYPE_VIEWCHANGE, Request(), config_.GetSelfInfo().id());
view_change_message.SerializeToString(request->mutable_data());
replica_communicator_->BroadCast(*request);
}
void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash) {
LOG(ERROR) << "ADDING COMPLAINT";
std::lock_guard<std::mutex> lk(vc_mutex_);
if (complaining_clients_.count(proxy_id) == 0) {
complaining_clients_[proxy_id].set_proxy_id(proxy_id);
}
auto complaint_ = complaining_clients_[proxy_id].SetComplaining(
hash, system_info_->GetCurrentView());
if (viewchange_timeout_min_heap_[proxy_id].size() <
config_.GetMaxClientComplaintNum()) {
viewchange_timeout_min_heap_[proxy_id].push(complaint_);
sem_post(&viewchange_timer_signal_);
} else {
// LOG(INFO) << "The number of complaints reaches the maximum value";
}
}
void ViewChangeManager::MonitoringViewChangeTimeOut() {
while (!stop_) {
// [DK3] After timer is out, the client will check if the corresponding
// client request has recevied sufficient valid responses
sem_wait(&viewchange_timer_signal_);
vc_mutex_.lock();
bool empty = true;
std::shared_ptr<ViewChangeTimeout> viewchange_timeout;
for (auto& heap : viewchange_timeout_min_heap_) {
if (!heap.second.empty()) {
viewchange_timeout = heap.second.top();
heap.second.pop();
vc_mutex_.unlock();
empty = false;
break;
}
}
if (empty) {
vc_mutex_.unlock();
continue;
}
auto current_time = GetCurrentTime();
if (viewchange_timeout->timeout_time > current_time) {
usleep(viewchange_timeout->timeout_time - current_time);
}
// [DK3] if not enough responses are received, the client broadcasts the
// client request to all replicas
if (viewchange_timeout->type == ViewChangeTimerType::TYPE_NEWVIEW) {
if (status_ == ViewChangeStatus::READY_NEW_VIEW &&
viewchange_timeout->view == system_info_->GetCurrentView()) {
// [DK12] if the replicas cannot receive a newview message in a timely
// manner, they will enter the next view and starts a new round of
// viewchange. SetCurrentViewAndNewPrimary(viewchange_timeout->view +
// 1);
LOG(ERROR) << "It is time to start a new viewchange";
checkpoint_manager_->TimeoutHandler();
}
} else if (viewchange_timeout->type ==
ViewChangeTimerType::TYPE_VIEWCHANGE) {
// [DK9] if the primary cannot get enough viewchange messages before the
// timer is out, then it broadcasts its viewchanges messages and starts
// the timer again.
if (status_ == ViewChangeStatus::READY_VIEW_CHANGE &&
viewchange_timeout->view == system_info_->GetCurrentView()) {
LOG(ERROR) << "It is time to rebroacast viewchange messages";
ChangeStatue(ViewChangeStatus::VIEW_CHANGE_FAIL);
checkpoint_manager_->TimeoutHandler();
}
} else if (viewchange_timeout->type ==
ViewChangeTimerType::TYPE_COMPLAINT) {
// [DK7] if the primary does not broadcast the request in a timely manner,
// the replica starts a viewchange
if (complaining_clients_[viewchange_timeout->proxy_id]
.CountViewChangeTimeout(viewchange_timeout->hash)) {
complaining_clients_[viewchange_timeout->proxy_id]
.EraseViewChangeTimeout(viewchange_timeout->hash);
}
std::lock_guard<std::mutex> lk(status_mutex_);
if (status_ == ViewChangeStatus::NONE &&
viewchange_timeout->view == system_info_->GetCurrentView()) {
if (viewchange_timeout->start_time >=
message_manager_->GetLastCommittedTime(
viewchange_timeout->proxy_id)) {
LOG(ERROR) << "It is time to start a viewchange";
checkpoint_manager_->TimeoutHandler();
assert(status_ == ViewChangeStatus::READY_VIEW_CHANGE);
}
}
}
}
}
void ViewChangeManager::MonitoringCheckpointState() {
uint64_t last_seq_value = 0;
while (!stop_) {
sem_wait(checkpoint_manager_->CommitableSeqSignal());
auto value = checkpoint_manager_->GetCommittableSeq();
if (last_seq_value != value) {
last_seq_value = value;
if (IsInViewChange()) {
ChangeStatue(ViewChangeStatus::NONE);
}
}
}
}
void ViewChangeManager::SetDuplicateManager(DuplicateManager* manager) {
duplicate_manager_ = manager;
}
} // namespace resdb