void CheckPointManager::UpdateStableCheckPointStatus()

in platform/consensus/ordering/pbft/checkpoint_manager.cpp [166:277]


void CheckPointManager::UpdateStableCheckPointStatus() {
  uint64_t last_committable_seq = 0;
  while (!stop_) {
    if (!Wait()) {
      continue;
    }
    uint64_t stable_seq = 0;
    std::string stable_hash;
    {
      std::lock_guard<std::mutex> lk(mutex_);
      for (auto it : sender_ckpt_) {
        if (it.second.size() >=
            static_cast<size_t>(config_.GetMinCheckpointReceiveNum())) {
          committable_seq_ = it.first.first;
          committable_hash_ = it.first.second;
          std::set<uint32_t> senders_ =
              sender_ckpt_[std::make_pair(committable_seq_, committable_hash_)];
          sem_post(&committable_seq_signal_);
          if (last_seq_ < committable_seq_ &&
              last_committable_seq < committable_seq_) {
            auto replicas_ = config_.GetReplicaInfos();
            for (auto& replica_ : replicas_) {
              std::string last_hash;
              uint64_t last_seq;
              {
                std::lock_guard<std::mutex> lk(lt_mutex_);
                last_hash = last_hash_;
                // last_seq_ = last_seq > last_committable_seq ? last_seq :
                // last_committable_seq;
                last_seq = last_seq_;
              }
              if (senders_.count(replica_.id()) &&
                  last_seq < committable_seq_) {
                // LOG(ERROR) << "GetRequestFromReplica " << last_seq_ + 1 << "
                // " << committable_seq_;
                auto requests = txn_accessor_.GetRequestFromReplica(
                    last_seq + 1, committable_seq_, replica_);
                if (requests.ok()) {
                  bool fail = false;
                  for (auto& request : *requests) {
                    if (SignatureVerifier::CalculateHash(request.data()) !=
                        request.hash()) {
                      LOG(ERROR)
                          << "The hash of the request does not match the data.";
                      fail = true;
                      break;
                    }
                    last_hash = GetHash(last_hash, request.hash());
                  }
                  if (fail) {
                    continue;
                  } else if (last_hash != committable_hash_) {
                    LOG(ERROR) << "The hash of requests returned do not match. "
                               << last_seq + 1 << " " << committable_seq_;
                  } else {
                    last_committable_seq = committable_seq_;
                    for (auto& request : *requests) {
                      if (executor_) {
                        executor_->Commit(std::make_unique<Request>(request));
                      }
                    }
                    SetHighestPreparedSeq(committable_seq_);
                    // LOG(ERROR) << "[4]";
                    break;
                  }
                }
              }
            }
          }
        }
        if (it.second.size() >=
            static_cast<size_t>(config_.GetMinDataReceiveNum())) {
          stable_seq = it.first.first;
          stable_hash = it.first.second;
        }
      }
      new_data_ = 0;
    }

    // LOG(ERROR) << "current stable seq:" << current_stable_seq_
    //  << " stable seq:" << stable_seq;
    std::vector<SignatureInfo> votes;
    if (current_stable_seq_ < stable_seq) {
      std::lock_guard<std::mutex> lk(mutex_);
      votes = sign_ckpt_[std::make_pair(stable_seq, stable_hash)];
      std::set<uint32_t> senders_ =
          sender_ckpt_[std::make_pair(stable_seq, stable_hash)];

      auto it = sender_ckpt_.begin();
      while (it != sender_ckpt_.end()) {
        if (it->first.first <= stable_seq) {
          sign_ckpt_.erase(sign_ckpt_.find(it->first));
          auto tmp = it++;
          sender_ckpt_.erase(tmp);
        } else {
          it++;
        }
      }
      stable_ckpt_.set_seq(stable_seq);
      stable_ckpt_.set_hash(stable_hash);
      stable_ckpt_.mutable_signatures()->Clear();
      for (auto vote : votes) {
        *stable_ckpt_.add_signatures() = vote;
      }
      current_stable_seq_ = stable_seq;
      // LOG(INFO) << "done. stable seq:" << current_stable_seq_
      //           << " votes:" << stable_ckpt_.DebugString();
      // LOG(INFO) << "done. stable seq:" << current_stable_seq_;
    }
    UpdateStableCheckPointCallback(current_stable_seq_);
  }
}