platform/consensus/ordering/pbft/commitment.cpp (234 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "platform/consensus/ordering/pbft/commitment.h" #include <glog/logging.h> #include <unistd.h> #include "common/utils/utils.h" #include "platform/consensus/ordering/pbft/transaction_utils.h" namespace resdb { Commitment::Commitment(const ResDBConfig& config, MessageManager* message_manager, ReplicaCommunicator* replica_communicator, SignatureVerifier* verifier) : config_(config), message_manager_(message_manager), stop_(false), replica_communicator_(replica_communicator), verifier_(verifier) { executed_thread_ = std::thread(&Commitment::PostProcessExecutedMsg, this); global_stats_ = Stats::GetGlobalStats(); duplicate_manager_ = std::make_unique<DuplicateManager>(config); message_manager_->SetDuplicateManager(duplicate_manager_.get()); global_stats_->SetProps( config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(), config_.GetSelfInfo().port(), config_.GetConfigData().enable_resview(), config_.GetConfigData().enable_faulty_switch()); global_stats_->SetPrimaryId(message_manager_->GetCurrentPrimary()); } Commitment::~Commitment() { stop_ = true; if (executed_thread_.joinable()) { executed_thread_.join(); } } void Commitment::SetPreVerifyFunc( std::function<bool(const Request& request)> func) { pre_verify_func_ = func; } void Commitment::SetNeedCommitQC(bool need_qc) { need_qc_ = need_qc; } // Handle the user request and send a pre-prepare message to others. // TODO if not a primary, redicet to the primary replica. int Commitment::ProcessNewRequest(std::unique_ptr<Context> context, std::unique_ptr<Request> user_request) { if (context == nullptr || context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject"; return -2; } if (uint64_t seq = duplicate_manager_->CheckIfExecuted(user_request->hash())) { LOG(ERROR) << "This request is already executed with seq: " << seq; user_request->set_seq(seq); message_manager_->SendResponse(std::move(user_request)); return -2; } if (config_.GetSelfInfo().id() != message_manager_->GetCurrentPrimary()) { // LOG(ERROR) << "current node is not primary. primary:" // << message_manager_->GetCurrentPrimary() // << " seq:" << user_request->seq() // << " hash:" << user_request->hash(); LOG(INFO) << "NOT PRIMARY, Primary is " << message_manager_->GetCurrentPrimary(); replica_communicator_->SendMessage(*user_request, message_manager_->GetCurrentPrimary()); { std::lock_guard<std::mutex> lk(rc_mutex_); request_complained_.push( std::make_pair(std::move(context), std::move(user_request))); } return -3; } /* if(SignatureVerifier::CalculateHash(user_request->data()) != user_request->hash()){ LOG(ERROR) << "the hash and data of the user request don't match, reject"; return -2; } */ // check signatures bool valid = verifier_->VerifyMessage(user_request->data(), user_request->data_signature()); if (!valid) { LOG(ERROR) << "request is not valid:" << user_request->data_signature().DebugString(); LOG(ERROR) << " msg:" << user_request->data().size(); return -2; } if (pre_verify_func_ && !pre_verify_func_(*user_request)) { LOG(ERROR) << " check by the user func fail"; return -2; } global_stats_->IncClientRequest(); if (duplicate_manager_->CheckAndAddProposed(user_request->hash())) { return -2; } auto seq = message_manager_->AssignNextSeq(); // Artificially make the primary stop proposing new trasactions. if (!seq.ok()) { duplicate_manager_->EraseProposed(user_request->hash()); global_stats_->SeqFail(); Request request; request.set_type(Request::TYPE_RESPONSE); request.set_sender_id(config_.GetSelfInfo().id()); request.set_proxy_id(user_request->proxy_id()); request.set_ret(-2); request.set_hash(user_request->hash()); replica_communicator_->SendMessage(request, request.proxy_id()); return -2; } global_stats_->RecordStateTime("request"); user_request->set_type(Request::TYPE_PRE_PREPARE); user_request->set_current_view(message_manager_->GetCurrentView()); user_request->set_seq(*seq); user_request->set_sender_id(config_.GetSelfInfo().id()); user_request->set_primary_id(config_.GetSelfInfo().id()); replica_communicator_->BroadCast(*user_request); return 0; } // Receive the pre-prepare message from the primary. // TODO check whether the sender is the primary. int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { if (global_stats_->IsFaulty() || context == nullptr || context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject"; return -2; } if (request->is_recovery()) { if (message_manager_->GetNextSeq() == 0 || request->seq() == message_manager_->GetNextSeq()) { message_manager_->SetNextSeq(request->seq() + 1); } else { LOG(ERROR) << " recovery request not valid:" << " current seq:" << message_manager_->GetNextSeq() << " data seq:" << request->seq(); return 0; } return message_manager_->AddConsensusMsg(context->signature, std::move(request)); } if (request->sender_id() != message_manager_->GetCurrentPrimary()) { LOG(ERROR) << "the request is not from primary. sender:" << request->sender_id() << " seq:" << request->seq(); return -2; } /* if(request->hash() != "null" + std::to_string(request->seq()) && SignatureVerifier::CalculateHash(request->data()) != request->hash()) { LOG(ERROR) << "the hash and data of the request don't match, reject"; return -2; } */ if (request->sender_id() != config_.GetSelfInfo().id()) { if (pre_verify_func_ && !pre_verify_func_(*request)) { LOG(ERROR) << " check by the user func fail"; return -2; } // global_stats_->GetTransactionDetails(std::move(request)); BatchUserRequest batch_request; batch_request.ParseFromString(request->data()); batch_request.clear_createtime(); std::string data; batch_request.SerializeToString(&data); // check signatures bool valid = verifier_->VerifyMessage(request->data(), request->data_signature()); if (!valid) { LOG(ERROR) << "request is not valid:" << request->data_signature().DebugString(); LOG(ERROR) << " msg:" << request->data().size(); return -2; } if (duplicate_manager_->CheckAndAddProposed(request->hash())) { LOG(INFO) << "The request is already proposed, reject"; return -2; } } global_stats_->IncPropose(); global_stats_->RecordStateTime("pre-prepare"); std::unique_ptr<Request> prepare_request = resdb::NewRequest( Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id()); prepare_request->clear_data(); // Add request to message_manager. // If it has received enough same requests(2f+1), broadcast the prepare // message. CollectorResultCode ret = message_manager_->AddConsensusMsg(context->signature, std::move(request)); if (ret == CollectorResultCode::STATE_CHANGED) { replica_communicator_->BroadCast(*prepare_request); } return ret == CollectorResultCode::INVALID ? -2 : 0; } // If receive 2f+1 prepare message, broadcast a commit message. int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { if (context == nullptr || context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject"; return -2; } if (request->is_recovery()) { return message_manager_->AddConsensusMsg(context->signature, std::move(request)); } //global_stats_->IncPrepare(); std::unique_ptr<Request> commit_request = resdb::NewRequest( Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id()); commit_request->mutable_data_signature()->Clear(); // Add request to message_manager. // If it has received enough same requests(2f+1), broadcast the commit // message. uint64_t seq_ = request->seq(); CollectorResultCode ret = message_manager_->AddConsensusMsg(context->signature, std::move(request)); if (ret == CollectorResultCode::STATE_CHANGED) { if (message_manager_->GetHighestPreparedSeq() < seq_) { message_manager_->SetHighestPreparedSeq(seq_); } // If need qc, sign the data if (need_qc_ && verifier_) { auto signature_or = verifier_->SignMessage(commit_request->hash()); if (!signature_or.ok()) { LOG(ERROR) << "Sign message fail"; return -2; } *commit_request->mutable_data_signature() = *signature_or; // LOG(ERROR) << "sign hash" // << commit_request->data_signature().DebugString(); } global_stats_->RecordStateTime("prepare"); replica_communicator_->BroadCast(*commit_request); } return ret == CollectorResultCode::INVALID ? -2 : 0; } // If receive 2f+1 commit message, commit the request. int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { if (context == nullptr || context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject" << " context:" << (context == nullptr); return -2; } if (request->is_recovery()) { return message_manager_->AddConsensusMsg(context->signature, std::move(request)); } //global_stats_->IncCommit(); // Add request to message_manager. // If it has received enough same requests(2f+1), message manager will // commit the request. CollectorResultCode ret = message_manager_->AddConsensusMsg(context->signature, std::move(request)); if (ret == CollectorResultCode::STATE_CHANGED) { // LOG(ERROR)<<request->data().size(); // global_stats_->GetTransactionDetails(request->data()); global_stats_->RecordStateTime("commit"); } return ret == CollectorResultCode::INVALID ? -2 : 0; } // =========== private threads =========================== // If the transaction is executed, send back to the proxy. int Commitment::PostProcessExecutedMsg() { while (!stop_) { auto batch_resp = message_manager_->GetResponseMsg(); if (batch_resp == nullptr) { continue; } global_stats_->SendSummary(); Request request; request.set_hash(batch_resp->hash()); request.set_seq(batch_resp->seq()); request.set_type(Request::TYPE_RESPONSE); request.set_sender_id(config_.GetSelfInfo().id()); request.set_current_view(batch_resp->current_view()); request.set_proxy_id(batch_resp->proxy_id()); request.set_primary_id(batch_resp->primary_id()); // LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id(); batch_resp->SerializeToString(request.mutable_data()); replica_communicator_->SendMessage(request, request.proxy_id()); } return 0; } DuplicateManager* Commitment::GetDuplicateManager() { return duplicate_manager_.get(); } } // namespace resdb