in platform/consensus/ordering/pbft/checkpoint_manager.cpp [166:277]
void CheckPointManager::UpdateStableCheckPointStatus() {
uint64_t last_committable_seq = 0;
while (!stop_) {
if (!Wait()) {
continue;
}
uint64_t stable_seq = 0;
std::string stable_hash;
{
std::lock_guard<std::mutex> lk(mutex_);
for (auto it : sender_ckpt_) {
if (it.second.size() >=
static_cast<size_t>(config_.GetMinCheckpointReceiveNum())) {
committable_seq_ = it.first.first;
committable_hash_ = it.first.second;
std::set<uint32_t> senders_ =
sender_ckpt_[std::make_pair(committable_seq_, committable_hash_)];
sem_post(&committable_seq_signal_);
if (last_seq_ < committable_seq_ &&
last_committable_seq < committable_seq_) {
auto replicas_ = config_.GetReplicaInfos();
for (auto& replica_ : replicas_) {
std::string last_hash;
uint64_t last_seq;
{
std::lock_guard<std::mutex> lk(lt_mutex_);
last_hash = last_hash_;
// last_seq_ = last_seq > last_committable_seq ? last_seq :
// last_committable_seq;
last_seq = last_seq_;
}
if (senders_.count(replica_.id()) &&
last_seq < committable_seq_) {
// LOG(ERROR) << "GetRequestFromReplica " << last_seq_ + 1 << "
// " << committable_seq_;
auto requests = txn_accessor_.GetRequestFromReplica(
last_seq + 1, committable_seq_, replica_);
if (requests.ok()) {
bool fail = false;
for (auto& request : *requests) {
if (SignatureVerifier::CalculateHash(request.data()) !=
request.hash()) {
LOG(ERROR)
<< "The hash of the request does not match the data.";
fail = true;
break;
}
last_hash = GetHash(last_hash, request.hash());
}
if (fail) {
continue;
} else if (last_hash != committable_hash_) {
LOG(ERROR) << "The hash of requests returned do not match. "
<< last_seq + 1 << " " << committable_seq_;
} else {
last_committable_seq = committable_seq_;
for (auto& request : *requests) {
if (executor_) {
executor_->Commit(std::make_unique<Request>(request));
}
}
SetHighestPreparedSeq(committable_seq_);
// LOG(ERROR) << "[4]";
break;
}
}
}
}
}
}
if (it.second.size() >=
static_cast<size_t>(config_.GetMinDataReceiveNum())) {
stable_seq = it.first.first;
stable_hash = it.first.second;
}
}
new_data_ = 0;
}
// LOG(ERROR) << "current stable seq:" << current_stable_seq_
// << " stable seq:" << stable_seq;
std::vector<SignatureInfo> votes;
if (current_stable_seq_ < stable_seq) {
std::lock_guard<std::mutex> lk(mutex_);
votes = sign_ckpt_[std::make_pair(stable_seq, stable_hash)];
std::set<uint32_t> senders_ =
sender_ckpt_[std::make_pair(stable_seq, stable_hash)];
auto it = sender_ckpt_.begin();
while (it != sender_ckpt_.end()) {
if (it->first.first <= stable_seq) {
sign_ckpt_.erase(sign_ckpt_.find(it->first));
auto tmp = it++;
sender_ckpt_.erase(tmp);
} else {
it++;
}
}
stable_ckpt_.set_seq(stable_seq);
stable_ckpt_.set_hash(stable_hash);
stable_ckpt_.mutable_signatures()->Clear();
for (auto vote : votes) {
*stable_ckpt_.add_signatures() = vote;
}
current_stable_seq_ = stable_seq;
// LOG(INFO) << "done. stable seq:" << current_stable_seq_
// << " votes:" << stable_ckpt_.DebugString();
// LOG(INFO) << "done. stable seq:" << current_stable_seq_;
}
UpdateStableCheckPointCallback(current_stable_seq_);
}
}