platform/consensus/ordering/pbft/message_manager.h (74 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <stdint.h>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include "chain/state/chain_state.h"
#include "executor/common/transaction_manager.h"
#include "platform/common/queue/lock_free_queue.h"
#include "platform/config/resdb_config.h"
#include "platform/consensus/ordering/pbft/checkpoint_manager.h"
#include "platform/consensus/ordering/pbft/lock_free_collector_pool.h"
#include "platform/consensus/ordering/pbft/transaction_collector.h"
#include "platform/consensus/ordering/pbft/transaction_utils.h"
#include "platform/networkstrate/server_comm.h"
#include "platform/proto/checkpoint_info.pb.h"
#include "platform/proto/resdb.pb.h"
#include "platform/statistic/stats.h"
namespace resdb {
class MessageManager {
public:
MessageManager(const ResDBConfig& config,
std::unique_ptr<TransactionManager> transaction_manager,
CheckPointManager* checkpoint_manager,
SystemInfo* system_info);
~MessageManager();
absl::StatusOr<uint64_t> AssignNextSeq();
int64_t GetCurrentPrimary() const;
uint64_t GetMinExecutCandidateSeq();
void SetNextSeq(uint64_t seq);
int64_t GetNextSeq();
// Add commit messages and return the number of messages have been received.
// The commit messages only include post(pre-prepare), prepare and commit
// messages. Messages are handled by state (PREPARE,COMMIT,READY_EXECUTE).
// If there are enough messages and the state is changed after adding the
// message, return 1, otherwise return 0. Return -2 if the request is not
// valid.
CollectorResultCode AddConsensusMsg(const SignatureInfo& signature,
std::unique_ptr<Request> request);
// Obtain the request that has been executed from Executor.
// The messages that have been executed from Executor will save inside
// Message Manager. Consensus Service can obtain the message then send back
// to the client proxy.
std::unique_ptr<BatchUserResponse> GetResponseMsg();
// Get committed messages with 2f+1 proof in [min_seq, max_seq].
RequestSet GetRequestSet(uint64_t min_seq, uint64_t max_seq);
// Get the transactions that have been execuited.
Request* GetRequest(uint64_t seq);
// Get the proof info containing the request and signatures
// if the request has been prepared, having received 2f+1
// pre-prepare messages.
std::vector<RequestInfo> GetPreparedProof(uint64_t seq);
TransactionStatue GetTransactionState(uint64_t seq);
// ============= System information ========
// Obtain the current replica list.
std::vector<ReplicaInfo> GetReplicas();
uint64_t GetCurrentView() const;
// Replica State
int GetReplicaState(ReplicaState* state);
std::unique_ptr<Context> FetchClientContext(uint64_t seq);
Storage* GetStorage();
void SetLastCommittedTime(uint64_t proxy_id);
uint64_t GetLastCommittedTime(uint64_t proxy_id);
bool IsPreapared(uint64_t seq);
uint64_t GetHighestPreparedSeq();
void SetHighestPreparedSeq(uint64_t seq);
void SetDuplicateManager(DuplicateManager* manager);
void SendResponse(std::unique_ptr<Request> request);
LockFreeCollectorPool* GetCollectorPool();
private:
bool IsValidMsg(const Request& request);
bool MayConsensusChangeStatus(int type, int received_count,
std::atomic<TransactionStatue>* status,
bool force);
private:
ResDBConfig config_;
uint64_t next_seq_ = 1;
LockFreeQueue<BatchUserResponse> queue_;
ChainState* txn_db_;
SystemInfo* system_info_;
CheckPointManager* checkpoint_manager_;
std::map<uint64_t, std::vector<std::unique_ptr<RequestInfo>>>
committed_proof_;
std::map<uint64_t, Request> committed_data_;
std::mutex data_mutex_, seq_mutex_;
std::unique_ptr<TransactionExecutor> transaction_executor_;
std::unique_ptr<LockFreeCollectorPool> collector_pool_;
Stats* global_stats_;
std::mutex lct_lock_;
std::map<uint64_t, uint64_t> last_committed_time_;
};
} // namespace resdb