platform/consensus/ordering/common/framework/consensus.cpp (124 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "platform/consensus/ordering/common/framework/consensus.h" #include <glog/logging.h> #include <unistd.h> #include "common/utils/utils.h" namespace resdb { namespace common { Consensus::Consensus(const ResDBConfig& config, std::unique_ptr<TransactionManager> executor) : ConsensusManager(config), replica_communicator_(GetBroadCastClient()), transaction_executor_(std::make_unique<TransactionExecutor>( config, [&](std::unique_ptr<Request> request, std::unique_ptr<BatchUserResponse> resp_msg) { ResponseMsg(*resp_msg); }, nullptr, std::move(executor))) { LOG(INFO) << "is running is performance mode:" << config_.IsPerformanceRunning(); is_stop_ = false; global_stats_ = Stats::GetGlobalStats(); } void Consensus::Init() { if (performance_manager_ == nullptr) { performance_manager_ = config_.IsPerformanceRunning() ? std::make_unique<PerformanceManager>( config_, GetBroadCastClient(), GetSignatureVerifier()) : nullptr; } if (response_manager_ == nullptr) { response_manager_ = !config_.IsPerformanceRunning() ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(), GetSignatureVerifier()) : nullptr; } } void Consensus::InitProtocol(ProtocolBase* protocol) { protocol->SetSingleCallFunc( [&](int type, const google::protobuf::Message& msg, int node_id) { return SendMsg(type, msg, node_id); }); protocol->SetBroadcastCallFunc( [&](int type, const google::protobuf::Message& msg) { return Broadcast(type, msg); }); protocol->SetCommitFunc( [&](const google::protobuf::Message& msg) { return CommitMsg(msg); }); } Consensus::~Consensus() { is_stop_ = true; } void Consensus::SetPerformanceManager( std::unique_ptr<PerformanceManager> performance_manager) { performance_manager_ = std::move(performance_manager); } bool Consensus::IsStop() { return is_stop_; } void Consensus::SetupPerformanceDataFunc(std::function<std::string()> func) { performance_manager_->SetDataFunc(func); } void Consensus::SetCommunicator(ReplicaCommunicator* replica_communicator) { replica_communicator_ = replica_communicator; } int Consensus::Broadcast(int type, const google::protobuf::Message& msg) { Request request; msg.SerializeToString(request.mutable_data()); request.set_type(Request::TYPE_CUSTOM_CONSENSUS); request.set_user_type(type); request.set_sender_id(config_.GetSelfInfo().id()); replica_communicator_->BroadCast(request); return 0; } int Consensus::SendMsg(int type, const google::protobuf::Message& msg, int node_id) { Request request; msg.SerializeToString(request.mutable_data()); request.set_type(Request::TYPE_CUSTOM_CONSENSUS); request.set_user_type(type); request.set_sender_id(config_.GetSelfInfo().id()); replica_communicator_->SendMessage(request, node_id); return 0; } std::vector<ReplicaInfo> Consensus::GetReplicas() { return config_.GetReplicaInfos(); } int Consensus::CommitMsg(const google::protobuf::Message& txn) { return 0; } // The implementation of PBFT. int Consensus::ConsensusCommit(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { switch (request->type()) { case Request::TYPE_CLIENT_REQUEST: if (config_.IsPerformanceRunning()) { return performance_manager_->StartEval(); } case Request::TYPE_RESPONSE: if (config_.IsPerformanceRunning()) { return performance_manager_->ProcessResponseMsg(std::move(context), std::move(request)); } case Request::TYPE_NEW_TXNS: { return ProcessNewTransaction(std::move(request)); } case Request::TYPE_CUSTOM_CONSENSUS: { return ProcessCustomConsensus(std::move(request)); } } return 0; } int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) { return 0; } int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) { return 0; } int Consensus::ResponseMsg(const BatchUserResponse& batch_resp) { Request request; request.set_seq(batch_resp.seq()); request.set_type(Request::TYPE_RESPONSE); request.set_sender_id(config_.GetSelfInfo().id()); request.set_proxy_id(batch_resp.proxy_id()); batch_resp.SerializeToString(request.mutable_data()); replica_communicator_->SendMessage(request, request.proxy_id()); return 0; } } // namespace common } // namespace resdb