platform/consensus/ordering/pbft/consensus_manager_pbft.cpp (213 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/consensus/ordering/pbft/consensus_manager_pbft.h"
#include <glog/logging.h>
#include <unistd.h>
#include "common/crypto/signature_verifier.h"
namespace resdb {
ConsensusManagerPBFT::ConsensusManagerPBFT(
const ResDBConfig& config, std::unique_ptr<TransactionManager> executor,
std::unique_ptr<CustomQuery> query_executor)
: ConsensusManager(config),
system_info_(std::make_unique<SystemInfo>(config)),
checkpoint_manager_(std::make_unique<CheckPointManager>(
config, GetBroadCastClient(), GetSignatureVerifier())),
message_manager_(std::make_unique<MessageManager>(
config, std::move(executor), checkpoint_manager_.get(),
system_info_.get())),
commitment_(std::make_unique<Commitment>(config_, message_manager_.get(),
GetBroadCastClient(),
GetSignatureVerifier())),
query_(std::make_unique<Query>(config_, message_manager_.get(),
std::move(query_executor))),
response_manager_(config_.IsPerformanceRunning()
? nullptr
: std::make_unique<ResponseManager>(
config_, GetBroadCastClient(),
system_info_.get(), GetSignatureVerifier())),
performance_manager_(config_.IsPerformanceRunning()
? std::make_unique<PerformanceManager>(
config_, GetBroadCastClient(),
system_info_.get(), GetSignatureVerifier())
: nullptr),
view_change_manager_(std::make_unique<ViewChangeManager>(
config_, checkpoint_manager_.get(), message_manager_.get(),
system_info_.get(), GetBroadCastClient(), GetSignatureVerifier())),
recovery_(std::make_unique<Recovery>(config_, checkpoint_manager_.get(),
system_info_.get(),
message_manager_->GetStorage())) {
LOG(INFO) << "is running is performance mode:"
<< config_.IsPerformanceRunning();
global_stats_ = Stats::GetGlobalStats();
view_change_manager_->SetDuplicateManager(commitment_->GetDuplicateManager());
recovery_->ReadLogs(
[&](const SystemInfoData& data) {
system_info_->SetCurrentView(data.view());
system_info_->SetPrimary(data.primary_id());
},
[&](std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
return InternalConsensusCommit(std::move(context), std::move(request));
});
}
void ConsensusManagerPBFT::SetNeedCommitQC(bool need_qc) {
commitment_->SetNeedCommitQC(need_qc);
}
void ConsensusManagerPBFT::Start() { ConsensusManager::Start(); }
std::vector<ReplicaInfo> ConsensusManagerPBFT::GetReplicas() {
return message_manager_->GetReplicas();
}
uint32_t ConsensusManagerPBFT::GetPrimary() {
return system_info_->GetPrimaryId();
}
uint32_t ConsensusManagerPBFT::GetVersion() {
return system_info_->GetCurrentView();
}
void ConsensusManagerPBFT::SetPrimary(uint32_t primary, uint64_t version) {
if (version > system_info_->GetCurrentView()) {
system_info_->SetCurrentView(version);
system_info_->SetPrimary(primary);
}
}
void ConsensusManagerPBFT::AddPendingRequest(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
std::lock_guard<std::mutex> lk(mutex_);
request_pending_.push(std::make_pair(std::move(context), std::move(request)));
}
void ConsensusManagerPBFT::AddComplainedRequest(
std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
std::lock_guard<std::mutex> lk(mutex_);
request_complained_.push(
std::make_pair(std::move(context), std::move(request)));
}
absl::StatusOr<std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>>
ConsensusManagerPBFT::PopPendingRequest() {
std::lock_guard<std::mutex> lk(mutex_);
if (request_pending_.empty()) {
// LOG(ERROR) << "empty:";
return absl::InternalError("No Data.");
}
auto new_request = std::move(request_pending_.front());
request_pending_.pop();
return new_request;
}
absl::StatusOr<std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>>
ConsensusManagerPBFT::PopComplainedRequest() {
std::lock_guard<std::mutex> lk(mutex_);
if (request_complained_.empty()) {
// LOG(ERROR) << "empty:";
return absl::InternalError("No Data.");
}
auto new_request = std::move(request_complained_.front());
request_complained_.pop();
return new_request;
}
// The implementation of PBFT.
int ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
// LOG(INFO) << "recv impl type:" << request->type() << " "
// << "sender id:" << request->sender_id();
// If it is in viewchange, push the request to the queue
// for the requests from the new view which come before
// the local new view done.
recovery_->AddRequest(context.get(), request.get());
if (config_.GetConfigData().enable_viewchange()) {
view_change_manager_->MayStart();
if (view_change_manager_->IsInViewChange()) {
switch (request->type()) {
case Request::TYPE_NEW_TXNS:
case Request::TYPE_PRE_PREPARE:
case Request::TYPE_PREPARE:
case Request::TYPE_COMMIT:
AddPendingRequest(std::move(context), std::move(request));
return 0;
}
} else {
while (true) {
auto new_request = PopPendingRequest();
if (!new_request.ok()) {
break;
}
InternalConsensusCommit(std::move((*new_request).first),
std::move((*new_request).second));
}
}
}
int ret = InternalConsensusCommit(std::move(context), std::move(request));
if (config_.GetConfigData().enable_viewchange()) {
if (ret == -4) {
while (true) {
auto new_request = PopComplainedRequest();
if (!new_request.ok()) {
break;
}
// LOG(ERROR) << "[POP COMPLAINED REQUEST]";
InternalConsensusCommit(std::move((*new_request).first),
std::move((*new_request).second));
}
}
}
return ret;
}
int ConsensusManagerPBFT::InternalConsensusCommit(
std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
// LOG(INFO) << "recv impl type:" << request->type() << " "
// << "sender id:" << request->sender_id()<<" seq:"<<request->seq();
switch (request->type()) {
case Request::TYPE_CLIENT_REQUEST:
if (config_.IsPerformanceRunning()) {
return performance_manager_->StartEval();
}
return response_manager_->NewUserRequest(std::move(context),
std::move(request));
case Request::TYPE_RESPONSE:
if (config_.IsPerformanceRunning()) {
return performance_manager_->ProcessResponseMsg(std::move(context),
std::move(request));
}
return response_manager_->ProcessResponseMsg(std::move(context),
std::move(request));
case Request::TYPE_NEW_TXNS: {
uint64_t proxy_id = request->proxy_id();
std::string hash = request->hash();
int ret = commitment_->ProcessNewRequest(std::move(context),
std::move(request));
if (ret == -3) {
LOG(ERROR) << "BAD RETURN";
std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>
request_complained;
{
std::lock_guard<std::mutex> lk(commitment_->rc_mutex_);
request_complained =
std::move(commitment_->request_complained_.front());
commitment_->request_complained_.pop();
}
AddComplainedRequest(std::move(request_complained.first),
std::move(request_complained.second));
view_change_manager_->AddComplaintTimer(proxy_id, hash);
}
return ret;
}
case Request::TYPE_PRE_PREPARE:
return commitment_->ProcessProposeMsg(std::move(context),
std::move(request));
case Request::TYPE_PREPARE:
return commitment_->ProcessPrepareMsg(std::move(context),
std::move(request));
case Request::TYPE_COMMIT:
return commitment_->ProcessCommitMsg(std::move(context),
std::move(request));
case Request::TYPE_CHECKPOINT:
return checkpoint_manager_->ProcessCheckPoint(std::move(context),
std::move(request));
case Request::TYPE_VIEWCHANGE:
return view_change_manager_->ProcessViewChange(std::move(context),
std::move(request));
case Request::TYPE_NEWVIEW:
return view_change_manager_->ProcessNewView(std::move(context),
std::move(request));
case Request::TYPE_QUERY:
return query_->ProcessQuery(std::move(context), std::move(request));
case Request::TYPE_REPLICA_STATE:
return query_->ProcessGetReplicaState(std::move(context),
std::move(request));
case Request::TYPE_CUSTOM_QUERY:
return query_->ProcessCustomQuery(std::move(context), std::move(request));
}
return 0;
}
void ConsensusManagerPBFT::SetupPerformanceDataFunc(
std::function<std::string()> func) {
performance_manager_->SetDataFunc(func);
}
void ConsensusManagerPBFT::SetPreVerifyFunc(
std::function<bool(const Request&)> func) {
commitment_->SetPreVerifyFunc(func);
}
} // namespace resdb