platform/consensus/ordering/pbft/checkpoint_manager.cpp (312 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "platform/consensus/ordering/pbft/checkpoint_manager.h" #include <glog/logging.h> #include "platform/consensus/ordering/pbft/transaction_utils.h" #include "platform/proto/checkpoint_info.pb.h" namespace resdb { CheckPointManager::CheckPointManager(const ResDBConfig& config, ReplicaCommunicator* replica_communicator, SignatureVerifier* verifier) : config_(config), replica_communicator_(replica_communicator), txn_db_(std::make_unique<ChainState>()), verifier_(verifier), stop_(false), txn_accessor_(config), highest_prepared_seq_(0) { current_stable_seq_ = 0; if (config_.GetConfigData().enable_viewchange()) { config_.EnableCheckPoint(true); } if (config_.IsCheckPointEnabled()) { stable_checkpoint_thread_ = std::thread(&CheckPointManager::UpdateStableCheckPointStatus, this); checkpoint_thread_ = std::thread(&CheckPointManager::UpdateCheckPointStatus, this); } sem_init(&committable_seq_signal_, 0, 0); } CheckPointManager::~CheckPointManager() { Stop(); } void CheckPointManager::Stop() { stop_ = true; if (checkpoint_thread_.joinable()) { checkpoint_thread_.join(); } if (stable_checkpoint_thread_.joinable()) { stable_checkpoint_thread_.join(); } } std::string GetHash(const std::string& h1, const std::string& h2) { return SignatureVerifier::CalculateHash(h1 + h2); } ChainState* CheckPointManager::GetTxnDB() { return txn_db_.get(); } uint64_t CheckPointManager::GetMaxTxnSeq() { return txn_db_->GetMaxSeq(); } uint64_t CheckPointManager::GetStableCheckpoint() { std::lock_guard<std::mutex> lk(mutex_); return current_stable_seq_; } StableCheckPoint CheckPointManager::GetStableCheckpointWithVotes() { std::lock_guard<std::mutex> lk(mutex_); return stable_ckpt_; } void CheckPointManager::AddCommitData(std::unique_ptr<Request> request) { if (config_.IsCheckPointEnabled()) { data_queue_.Push(std::move(request)); } else { txn_db_->Put(std::move(request)); } } // check whether there are 2f+1 valid checkpoint proof. bool CheckPointManager::IsValidCheckpointProof( const StableCheckPoint& stable_ckpt) { std::string hash = stable_ckpt_.hash(); std::set<uint32_t> senders; for (const auto& signature : stable_ckpt_.signatures()) { if (!verifier_->VerifyMessage(hash, signature)) { return false; } senders.insert(signature.node_id()); } return (static_cast<int>(senders.size()) >= config_.GetMinDataReceiveNum()) || (stable_ckpt.seq() == 0 && senders.size() == 0); } int CheckPointManager::ProcessCheckPoint(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { CheckPointData checkpoint_data; if (!checkpoint_data.ParseFromString(request->data())) { LOG(ERROR) << "parse checkpont data fail:"; return -2; } uint64_t checkpoint_seq = checkpoint_data.seq(); uint32_t sender_id = request->sender_id(); int water_mark = config_.GetCheckPointWaterMark(); if (checkpoint_seq % water_mark) { LOG(ERROR) << "checkpoint seq not invalid:" << checkpoint_seq; return -2; } if (verifier_) { // check signatures bool valid = verifier_->VerifyMessage(checkpoint_data.hash(), checkpoint_data.hash_signature()); if (!valid) { LOG(ERROR) << "request is not valid:" << checkpoint_data.hash_signature().DebugString(); return -2; } } { std::lock_guard<std::mutex> lk(mutex_); auto res = sender_ckpt_[std::make_pair(checkpoint_seq, checkpoint_data.hash())] .insert(sender_id); if (res.second) { sign_ckpt_[std::make_pair(checkpoint_seq, checkpoint_data.hash())] .push_back(checkpoint_data.hash_signature()); new_data_++; } if (sender_ckpt_[std::make_pair(checkpoint_seq, checkpoint_data.hash())] .size() == 1) { for (auto& hash_ : checkpoint_data.hashs()) { hash_ckpt_[std::make_pair(checkpoint_seq, checkpoint_data.hash())] .push_back(hash_); } } Notify(); } return 0; } void CheckPointManager::Notify() { std::lock_guard<std::mutex> lk(cv_mutex_); cv_.notify_all(); } bool CheckPointManager::Wait() { int timeout_ms = 1000; std::unique_lock<std::mutex> lk(cv_mutex_); return cv_.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return new_data_ > 0; }); } void CheckPointManager::UpdateStableCheckPointStatus() { uint64_t last_committable_seq = 0; while (!stop_) { if (!Wait()) { continue; } uint64_t stable_seq = 0; std::string stable_hash; { std::lock_guard<std::mutex> lk(mutex_); for (auto it : sender_ckpt_) { if (it.second.size() >= static_cast<size_t>(config_.GetMinCheckpointReceiveNum())) { committable_seq_ = it.first.first; committable_hash_ = it.first.second; std::set<uint32_t> senders_ = sender_ckpt_[std::make_pair(committable_seq_, committable_hash_)]; sem_post(&committable_seq_signal_); if (last_seq_ < committable_seq_ && last_committable_seq < committable_seq_) { auto replicas_ = config_.GetReplicaInfos(); for (auto& replica_ : replicas_) { std::string last_hash; uint64_t last_seq; { std::lock_guard<std::mutex> lk(lt_mutex_); last_hash = last_hash_; // last_seq_ = last_seq > last_committable_seq ? last_seq : // last_committable_seq; last_seq = last_seq_; } if (senders_.count(replica_.id()) && last_seq < committable_seq_) { // LOG(ERROR) << "GetRequestFromReplica " << last_seq_ + 1 << " // " << committable_seq_; auto requests = txn_accessor_.GetRequestFromReplica( last_seq + 1, committable_seq_, replica_); if (requests.ok()) { bool fail = false; for (auto& request : *requests) { if (SignatureVerifier::CalculateHash(request.data()) != request.hash()) { LOG(ERROR) << "The hash of the request does not match the data."; fail = true; break; } last_hash = GetHash(last_hash, request.hash()); } if (fail) { continue; } else if (last_hash != committable_hash_) { LOG(ERROR) << "The hash of requests returned do not match. " << last_seq + 1 << " " << committable_seq_; } else { last_committable_seq = committable_seq_; for (auto& request : *requests) { if (executor_) { executor_->Commit(std::make_unique<Request>(request)); } } SetHighestPreparedSeq(committable_seq_); // LOG(ERROR) << "[4]"; break; } } } } } } if (it.second.size() >= static_cast<size_t>(config_.GetMinDataReceiveNum())) { stable_seq = it.first.first; stable_hash = it.first.second; } } new_data_ = 0; } // LOG(ERROR) << "current stable seq:" << current_stable_seq_ // << " stable seq:" << stable_seq; std::vector<SignatureInfo> votes; if (current_stable_seq_ < stable_seq) { std::lock_guard<std::mutex> lk(mutex_); votes = sign_ckpt_[std::make_pair(stable_seq, stable_hash)]; std::set<uint32_t> senders_ = sender_ckpt_[std::make_pair(stable_seq, stable_hash)]; auto it = sender_ckpt_.begin(); while (it != sender_ckpt_.end()) { if (it->first.first <= stable_seq) { sign_ckpt_.erase(sign_ckpt_.find(it->first)); auto tmp = it++; sender_ckpt_.erase(tmp); } else { it++; } } stable_ckpt_.set_seq(stable_seq); stable_ckpt_.set_hash(stable_hash); stable_ckpt_.mutable_signatures()->Clear(); for (auto vote : votes) { *stable_ckpt_.add_signatures() = vote; } current_stable_seq_ = stable_seq; // LOG(INFO) << "done. stable seq:" << current_stable_seq_ // << " votes:" << stable_ckpt_.DebugString(); // LOG(INFO) << "done. stable seq:" << current_stable_seq_; } UpdateStableCheckPointCallback(current_stable_seq_); } } void CheckPointManager::SetTimeoutHandler( std::function<void()> timeout_handler) { timeout_handler_ = timeout_handler; } void CheckPointManager::TimeoutHandler() { if (timeout_handler_) { timeout_handler_(); } } void CheckPointManager::UpdateCheckPointStatus() { uint64_t last_ckpt_seq = 0; int water_mark = config_.GetCheckPointWaterMark(); int timeout_ms = config_.GetViewchangeCommitTimeout(); std::vector<std::string> stable_hashs; std::vector<uint64_t> stable_seqs; while (!stop_) { auto request = data_queue_.Pop(timeout_ms); if (request == nullptr) { // if (last_seq > 0) { // TimeoutHandler(); // } continue; } std::string hash_ = request->hash(); uint64_t current_seq = request->seq(); if (current_seq != last_seq_ + 1) { LOG(ERROR) << "seq invalid:" << last_seq_ << " current:" << current_seq; continue; } { std::lock_guard<std::mutex> lk(lt_mutex_); last_hash_ = GetHash(last_hash_, request->hash()); last_seq_++; } bool is_recovery = request->is_recovery(); txn_db_->Put(std::move(request)); if (current_seq == last_ckpt_seq + water_mark) { last_ckpt_seq = current_seq; if (!is_recovery) { BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs, stable_seqs); } } } return; } void CheckPointManager::BroadcastCheckPoint( uint64_t seq, const std::string& hash, const std::vector<std::string>& stable_hashs, const std::vector<uint64_t>& stable_seqs) { CheckPointData checkpoint_data; std::unique_ptr<Request> checkpoint_request = NewRequest( Request::TYPE_CHECKPOINT, Request(), config_.GetSelfInfo().id()); checkpoint_data.set_seq(seq); checkpoint_data.set_hash(hash); if (verifier_) { auto signature_or = verifier_->SignMessage(hash); if (!signature_or.ok()) { LOG(ERROR) << "Sign message fail"; return; } *checkpoint_data.mutable_hash_signature() = *signature_or; } checkpoint_data.SerializeToString(checkpoint_request->mutable_data()); replica_communicator_->BroadCast(*checkpoint_request); } void CheckPointManager::WaitSignal() { std::unique_lock<std::mutex> lk(mutex_); signal_.wait(lk, [&] { return !stable_hash_queue_.Empty(); }); } std::unique_ptr<std::pair<uint64_t, std::string>> CheckPointManager::PopStableSeqHash() { return stable_hash_queue_.Pop(); } uint64_t CheckPointManager::GetHighestPreparedSeq() { std::lock_guard<std::mutex> lk(lt_mutex_); return highest_prepared_seq_; } void CheckPointManager::SetHighestPreparedSeq(uint64_t seq) { std::lock_guard<std::mutex> lk(lt_mutex_); highest_prepared_seq_ = seq; } sem_t* CheckPointManager::CommitableSeqSignal() { std::lock_guard<std::mutex> lk(lt_mutex_); return &committable_seq_signal_; } uint64_t CheckPointManager::GetCommittableSeq() { std::lock_guard<std::mutex> lk(lt_mutex_); return committable_seq_; } } // namespace resdb