platform/consensus/ordering/pbft/message_manager.cpp (226 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/consensus/ordering/pbft/message_manager.h"
#include <glog/logging.h>
#include "common/utils/utils.h"
namespace resdb {
MessageManager::MessageManager(
const ResDBConfig& config,
std::unique_ptr<TransactionManager> transaction_manager,
CheckPointManager* checkpoint_manager, SystemInfo* system_info)
: config_(config),
queue_("executed"),
txn_db_(checkpoint_manager->GetTxnDB()),
system_info_(system_info),
checkpoint_manager_(checkpoint_manager),
transaction_executor_(std::make_unique<TransactionExecutor>(
config,
[&](std::unique_ptr<Request> request,
std::unique_ptr<BatchUserResponse> resp_msg) {
if (request->is_recovery()) {
if (checkpoint_manager_) {
checkpoint_manager_->AddCommitData(std::move(request));
}
return;
}
resp_msg->set_proxy_id(request->proxy_id());
resp_msg->set_seq(request->seq());
resp_msg->set_current_view(request->current_view());
resp_msg->set_primary_id(GetCurrentPrimary());
if (transaction_executor_->NeedResponse() &&
resp_msg->proxy_id() != 0) {
queue_.Push(std::move(resp_msg));
}
if (checkpoint_manager_) {
checkpoint_manager_->AddCommitData(std::move(request));
}
},
system_info_, std::move(transaction_manager))),
collector_pool_(std::make_unique<LockFreeCollectorPool>(
"txn", config_.GetMaxProcessTxn(), transaction_executor_.get(),
config_.GetConfigData().enable_viewchange())) {
global_stats_ = Stats::GetGlobalStats();
transaction_executor_->SetSeqUpdateNotifyFunc(
[&](uint64_t seq) { collector_pool_->Update(seq - 1); });
checkpoint_manager_->SetExecutor(transaction_executor_.get());
}
MessageManager::~MessageManager() {
if (transaction_executor_) {
transaction_executor_->Stop();
}
}
std::unique_ptr<BatchUserResponse> MessageManager::GetResponseMsg() {
return queue_.Pop();
}
int64_t MessageManager::GetCurrentPrimary() const {
return system_info_->GetPrimaryId();
}
uint64_t MessageManager ::GetCurrentView() const {
return system_info_->GetCurrentView();
}
void MessageManager::SetNextSeq(uint64_t seq) {
next_seq_ = seq;
LOG(ERROR) << "set next seq:" << next_seq_;
}
int64_t MessageManager::GetNextSeq() { return next_seq_; }
absl::StatusOr<uint64_t> MessageManager::AssignNextSeq() {
std::unique_lock<std::mutex> lk(seq_mutex_);
uint32_t max_executed_seq = transaction_executor_->GetMaxPendingExecutedSeq();
global_stats_->SeqGap(next_seq_ - max_executed_seq);
if (next_seq_ - max_executed_seq >
static_cast<uint64_t>(config_.GetMaxProcessTxn())) {
// LOG(ERROR) << "next_seq_: " << next_seq_ << " max_executed_seq: " <<
// max_executed_seq;
return absl::InvalidArgumentError("Seq has been used up.");
}
return next_seq_++;
}
std::vector<ReplicaInfo> MessageManager::GetReplicas() {
return system_info_->GetReplicas();
}
// Check if the request is valid.
// 1. view is the same as the current view
// 2. seq is larger or equal than the next execute seq.
// 3. inside the water mark.
bool MessageManager::IsValidMsg(const Request& request) {
if (request.type() == Request::TYPE_RESPONSE) {
return true;
}
// view should be the same as the current one.
if (static_cast<uint64_t>(request.current_view()) != GetCurrentView()) {
LOG(ERROR) << "message view :[" << request.current_view()
<< "] is older than the cur view :[" << GetCurrentView() << "]";
return false;
}
if (static_cast<uint64_t>(request.seq()) <
transaction_executor_->GetMaxPendingExecutedSeq()) {
return false;
}
return true;
}
bool MessageManager::MayConsensusChangeStatus(
int type, int received_count, std::atomic<TransactionStatue>* status,
bool ret) {
switch (type) {
case Request::TYPE_PRE_PREPARE:
if (*status == TransactionStatue::None) {
TransactionStatue old_status = TransactionStatue::None;
return status->compare_exchange_strong(
old_status, TransactionStatue::READY_PREPARE,
std::memory_order_acq_rel, std::memory_order_acq_rel);
}
break;
case Request::TYPE_PREPARE:
if (*status == TransactionStatue::READY_PREPARE &&
config_.GetMinDataReceiveNum() <= received_count) {
TransactionStatue old_status = TransactionStatue::READY_PREPARE;
return status->compare_exchange_strong(
old_status, TransactionStatue::READY_COMMIT,
std::memory_order_acq_rel, std::memory_order_acq_rel);
}
break;
case Request::TYPE_COMMIT:
if (*status == TransactionStatue::READY_COMMIT &&
config_.GetMinDataReceiveNum() <= received_count) {
TransactionStatue old_status = TransactionStatue::READY_COMMIT;
return status->compare_exchange_strong(
old_status, TransactionStatue::READY_EXECUTE,
std::memory_order_acq_rel, std::memory_order_acq_rel);
return true;
}
break;
}
return ret;
}
// Add commit messages and return the number of messages have been received.
// The commit messages only include post(pre-prepare), prepare and commit
// messages. Messages are handled by state (PREPARE,COMMIT,READY_EXECUTE).
// If there are enough messages and the state is changed after adding the
// message, return 1, otherwise return 0. Return -2 if the request is not valid.
CollectorResultCode MessageManager::AddConsensusMsg(
const SignatureInfo& signature, std::unique_ptr<Request> request) {
if (request == nullptr || !IsValidMsg(*request)) {
return CollectorResultCode::INVALID;
}
int type = request->type();
uint64_t seq = request->seq();
int resp_received_count = 0;
int proxy_id = request->proxy_id();
int ret = collector_pool_->GetCollector(seq)->AddRequest(
std::move(request), signature, type == Request::TYPE_PRE_PREPARE,
[&](const Request& request, int received_count,
TransactionCollector::CollectorDataType* data,
std::atomic<TransactionStatue>* status, bool force) {
if (MayConsensusChangeStatus(type, received_count, status, force)) {
resp_received_count = 1;
}
});
if (ret == 1) {
SetLastCommittedTime(proxy_id);
} else if (ret != 0) {
return CollectorResultCode::INVALID;
}
if (resp_received_count > 0) {
return CollectorResultCode::STATE_CHANGED;
}
return CollectorResultCode::OK;
}
RequestSet MessageManager::GetRequestSet(uint64_t min_seq, uint64_t max_seq) {
RequestSet ret;
std::unique_lock<std::mutex> lk(data_mutex_);
for (uint64_t i = min_seq; i <= max_seq; ++i) {
if (committed_data_.find(i) == committed_data_.end()) {
LOG(ERROR) << "seq :" << i << " doesn't exist";
continue;
}
RequestWithProof* request = ret.add_requests();
*request->mutable_request() = committed_data_[i];
request->set_seq(i);
for (const auto& request_info : committed_proof_[i]) {
RequestWithProof::RequestData* data = request->add_proofs();
*data->mutable_request() = *request_info->request;
*data->mutable_signature() = request_info->signature;
}
}
return ret;
}
// Get the transactions that have been execuited.
Request* MessageManager::GetRequest(uint64_t seq) { return txn_db_->Get(seq); }
std::vector<RequestInfo> MessageManager::GetPreparedProof(uint64_t seq) {
return collector_pool_->GetCollector(seq)->GetPreparedProof();
}
TransactionStatue MessageManager::GetTransactionState(uint64_t seq) {
return collector_pool_->GetCollector(seq)->GetStatus();
}
int MessageManager::GetReplicaState(ReplicaState* state) {
*state->mutable_replica_config() = config_.GetConfigData();
return 0;
}
Storage* MessageManager::GetStorage() {
return transaction_executor_->GetStorage();
}
void MessageManager::SetLastCommittedTime(uint64_t proxy_id) {
lct_lock_.lock();
last_committed_time_[proxy_id] = GetCurrentTime();
lct_lock_.unlock();
}
uint64_t MessageManager::GetLastCommittedTime(uint64_t proxy_id) {
lct_lock_.lock();
auto value = last_committed_time_[proxy_id];
lct_lock_.unlock();
return value;
}
bool MessageManager::IsPreapared(uint64_t seq) {
return collector_pool_->GetCollector(seq)->IsPrepared();
}
uint64_t MessageManager::GetHighestPreparedSeq() {
return checkpoint_manager_->GetHighestPreparedSeq();
}
void MessageManager::SetHighestPreparedSeq(uint64_t seq) {
return checkpoint_manager_->SetHighestPreparedSeq(seq);
}
void MessageManager::SetDuplicateManager(DuplicateManager* manager) {
transaction_executor_->SetDuplicateManager(manager);
}
void MessageManager::SendResponse(std::unique_ptr<Request> request) {
std::unique_ptr<BatchUserResponse> response =
std::make_unique<BatchUserResponse>();
response->set_createtime(GetCurrentTime());
// response->set_local_id(batch_request.local_id());
response->set_hash(request->hash());
response->set_proxy_id(request->proxy_id());
response->set_seq(request->seq());
response->set_current_view(GetCurrentView());
response->set_primary_id(GetCurrentPrimary());
if (transaction_executor_->NeedResponse() && response->proxy_id() != 0) {
queue_.Push(std::move(response));
}
}
LockFreeCollectorPool* MessageManager::GetCollectorPool() {
return collector_pool_.get();
}
} // namespace resdb