platform/consensus/ordering/pbft/checkpoint_manager.h (79 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <semaphore.h>
#include "chain/state/chain_state.h"
#include "common/crypto/signature_verifier.h"
#include "interface/common/resdb_txn_accessor.h"
#include "platform/config/resdb_config.h"
#include "platform/consensus/checkpoint/checkpoint.h"
#include "platform/consensus/execution/transaction_executor.h"
#include "platform/networkstrate/replica_communicator.h"
#include "platform/networkstrate/server_comm.h"
#include "platform/proto/checkpoint_info.pb.h"
#include "platform/proto/resdb.pb.h"
namespace resdb {
class CheckPointManager : public CheckPoint {
public:
CheckPointManager(const ResDBConfig& config,
ReplicaCommunicator* replica_communicator,
SignatureVerifier* verifier);
virtual ~CheckPointManager();
ChainState* GetTxnDB();
uint64_t GetMaxTxnSeq();
void AddCommitData(std::unique_ptr<Request> request);
int ProcessCheckPoint(std::unique_ptr<Context> context,
std::unique_ptr<Request> request);
uint64_t GetStableCheckpoint() override;
StableCheckPoint GetStableCheckpointWithVotes();
bool IsValidCheckpointProof(const StableCheckPoint& stable_ckpt);
void SetTimeoutHandler(std::function<void()> timeout_handler);
virtual void UpdateStableCheckPointCallback(
int64_t current_stable_checkpoint) {}
void Stop();
void TimeoutHandler();
void WaitSignal();
std::unique_ptr<std::pair<uint64_t, std::string>> PopStableSeqHash();
void SetExecutor(TransactionExecutor* executor) { executor_ = executor; }
uint64_t GetHighestPreparedSeq();
void SetHighestPreparedSeq(uint64_t seq);
sem_t* CommitableSeqSignal();
uint64_t GetCommittableSeq();
private:
void UpdateCheckPointStatus();
void UpdateStableCheckPointStatus();
void BroadcastCheckPoint(uint64_t seq, const std::string& hash,
const std::vector<std::string>& stable_hashs,
const std::vector<uint64_t>& stable_seqs);
void Notify();
bool Wait();
protected:
ResDBConfig config_;
ReplicaCommunicator* replica_communicator_;
std::unique_ptr<ChainState> txn_db_;
std::thread checkpoint_thread_, stable_checkpoint_thread_;
SignatureVerifier* verifier_;
std::atomic<bool> stop_;
std::map<std::pair<uint64_t, std::string>, std::set<uint32_t>> sender_ckpt_;
std::map<std::pair<uint64_t, std::string>, std::vector<SignatureInfo>>
sign_ckpt_;
std::map<std::pair<uint64_t, std::string>, std::vector<std::string>>
hash_ckpt_;
std::atomic<uint64_t> current_stable_seq_;
std::mutex mutex_;
LockFreeQueue<Request> data_queue_;
std::mutex cv_mutex_;
std::condition_variable cv_;
std::function<void()> timeout_handler_;
StableCheckPoint stable_ckpt_;
int new_data_ = 0;
LockFreeQueue<std::pair<uint64_t, std::string>> stable_hash_queue_;
std::condition_variable signal_;
ResDBTxnAccessor txn_accessor_;
std::mutex lt_mutex_;
uint64_t last_seq_ = 0;
TransactionExecutor* executor_;
std::atomic<uint64_t> highest_prepared_seq_;
uint64_t committable_seq_ = 0;
std::string last_hash_, committable_hash_;
sem_t committable_seq_signal_;
};
} // namespace resdb