platform/consensus/ordering/pbft/response_manager.cpp (341 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "platform/consensus/ordering/pbft/response_manager.h" #include <glog/logging.h> #include "common/utils/utils.h" namespace resdb { ResponseClientTimeout::ResponseClientTimeout(std::string hash_, uint64_t time_) { this->hash = hash_; this->timeout_time = time_; } ResponseClientTimeout::ResponseClientTimeout( const ResponseClientTimeout& other) { this->hash = other.hash; this->timeout_time = other.timeout_time; } bool ResponseClientTimeout::operator<( const ResponseClientTimeout& other) const { return timeout_time > other.timeout_time; } ResponseManager::ResponseManager(const ResDBConfig& config, ReplicaCommunicator* replica_communicator, SystemInfo* system_info, SignatureVerifier* verifier) : config_(config), replica_communicator_(replica_communicator), collector_pool_(std::make_unique<LockFreeCollectorPool>( "response", config_.GetMaxProcessTxn(), nullptr)), context_pool_(std::make_unique<LockFreeCollectorPool>( "context", config_.GetMaxProcessTxn(), nullptr)), batch_queue_("user request"), system_info_(system_info), verifier_(verifier) { stop_ = false; local_id_ = 1; timeout_length_ = 5000000; if (config_.GetPublicKeyCertificateInfo() .public_key() .public_key_info() .type() == CertificateKeyInfo::CLIENT || config_.IsTestMode()) { user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this); } if (config_.GetConfigData().enable_viewchange()) { checking_timeout_thread_ = std::thread(&ResponseManager::MonitoringClientTimeOut, this); } global_stats_ = Stats::GetGlobalStats(); send_num_ = 0; } ResponseManager::~ResponseManager() { stop_ = true; if (user_req_thread_.joinable()) { user_req_thread_.join(); } if (checking_timeout_thread_.joinable()) { checking_timeout_thread_.join(); } } // use system info int ResponseManager::GetPrimary() { return system_info_->GetPrimaryId(); } int ResponseManager::AddContextList( std::vector<std::unique_ptr<Context>> context_list, uint64_t id) { return context_pool_->GetCollector(id)->SetContextList( id, std::move(context_list)); } std::vector<std::unique_ptr<Context>> ResponseManager::FetchContextList( uint64_t id) { auto context = context_pool_->GetCollector(id)->FetchContextList(id); context_pool_->Update(id); return context; } int ResponseManager::NewUserRequest(std::unique_ptr<Context> context, std::unique_ptr<Request> user_request) { if (!user_request->need_response()) { context->client = nullptr; } std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>(); queue_item->context = std::move(context); queue_item->user_request = std::move(user_request); batch_queue_.Push(std::move(queue_item)); return 0; } // =================== response ======================== // handle the response message. If receive f+1 commit messages, send back to the // caller. int ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { std::unique_ptr<Request> response; // Add the response message, and use the call back to collect the received // messages. // The callback will be triggered if it received f+1 messages. if (request->ret() == -2) { LOG(ERROR) << "get response fail:" << request->ret(); send_num_--; return 0; } CollectorResultCode ret = AddResponseMsg(context->signature, std::move(request), [&](const Request& request, const TransactionCollector::CollectorDataType*) { response = std::make_unique<Request>(request); return; }); if (ret == CollectorResultCode::STATE_CHANGED) { BatchUserResponse batch_response; if (batch_response.ParseFromString(response->data())) { SendResponseToClient(batch_response); } else { LOG(ERROR) << "parse response fail:"; } } return ret == CollectorResultCode::INVALID ? -2 : 0; } bool ResponseManager::MayConsensusChangeStatus( int type, int received_count, std::atomic<TransactionStatue>* status) { switch (type) { case Request::TYPE_RESPONSE: // if receive f+1 response results, ack to the caller. if (*status == TransactionStatue::None && config_.GetMinClientReceiveNum() <= received_count) { TransactionStatue old_status = TransactionStatue::None; return status->compare_exchange_strong( old_status, TransactionStatue::EXECUTED, std::memory_order_acq_rel, std::memory_order_acq_rel); } break; } return false; } CollectorResultCode ResponseManager::AddResponseMsg( const SignatureInfo& signature, std::unique_ptr<Request> request, std::function<void(const Request&, const TransactionCollector::CollectorDataType*)> response_call_back) { if (request == nullptr) { return CollectorResultCode::INVALID; } std::string hash = request->hash(); std::unique_ptr<BatchUserResponse> batch_response = std::make_unique<BatchUserResponse>(); if (!batch_response->ParseFromString(request->data())) { LOG(ERROR) << "parse response fail:" << request->data().size() << " seq:" << request->seq(); RemoveWaitingResponseRequest(hash); return CollectorResultCode::INVALID; } uint64_t seq = batch_response->local_id(); request->set_seq(seq); int type = request->type(); int resp_received_count = 0; int ret = collector_pool_->GetCollector(seq)->AddRequest( std::move(request), signature, false, [&](const Request& request, int received_count, TransactionCollector::CollectorDataType* data, std::atomic<TransactionStatue>* status, bool force) { if (MayConsensusChangeStatus(type, received_count, status)) { resp_received_count = 1; response_call_back(request, data); } }); if (ret != 0) { return CollectorResultCode::INVALID; } if (resp_received_count > 0) { collector_pool_->Update(seq); RemoveWaitingResponseRequest(hash); return CollectorResultCode::STATE_CHANGED; } return CollectorResultCode::OK; } void ResponseManager::SendResponseToClient( const BatchUserResponse& batch_response) { uint64_t create_time = batch_response.createtime(); uint64_t local_id = batch_response.local_id(); if (create_time > 0) { uint64_t run_time = GetCurrentTime() - create_time; global_stats_->AddLatency(run_time); } else { LOG(ERROR) << "seq:" << local_id << " no resp"; } send_num_--; if (config_.IsPerformanceRunning()) { return; } std::vector<std::unique_ptr<Context>> context_list = FetchContextList(batch_response.local_id()); if (context_list.empty()) { LOG(ERROR) << "context list is empty. local id:" << batch_response.local_id(); return; } for (size_t i = 0; i < context_list.size(); ++i) { auto& context = context_list[i]; if (context->client == nullptr) { LOG(ERROR) << " no channel:"; continue; } int ret = context->client->SendRawMessageData(batch_response.response(i)); if (ret) { LOG(ERROR) << "send resp fail ret:" << ret; } } } // =================== request ======================== int ResponseManager::BatchProposeMsg() { LOG(INFO) << "batch wait time:" << config_.ClientBatchWaitTimeMS() << " batch num:" << config_.ClientBatchNum(); std::vector<std::unique_ptr<QueueItem>> batch_req; while (!stop_) { if (send_num_ > config_.GetMaxProcessTxn()) { LOG(ERROR) << "send num too high, wait:" << send_num_; usleep(100); continue; } if (batch_req.size() < config_.ClientBatchNum()) { std::unique_ptr<QueueItem> item = batch_queue_.Pop(config_.ClientBatchWaitTimeMS()); if (item != nullptr) { batch_req.push_back(std::move(item)); if (batch_req.size() < config_.ClientBatchNum()) { continue; } } } if (batch_req.empty()) { continue; } int ret = DoBatch(batch_req); batch_req.clear(); if (ret != 0) { Response response; response.set_result(Response::ERROR); for (size_t i = 0; i < batch_req.size(); ++i) { if (batch_req[i]->context && batch_req[i]->context->client) { int ret = batch_req[i]->context->client->SendRawMessage(response); if (ret) { LOG(ERROR) << "send resp" << response.DebugString() << " fail ret:" << ret; } } } } } return 0; } int ResponseManager::DoBatch( const std::vector<std::unique_ptr<QueueItem>>& batch_req) { auto new_request = NewRequest(Request::TYPE_NEW_TXNS, Request(), config_.GetSelfInfo().id()); if (new_request == nullptr) { return -2; } std::vector<std::unique_ptr<Context>> context_list; BatchUserRequest batch_request; for (size_t i = 0; i < batch_req.size(); ++i) { BatchUserRequest::UserRequest* req = batch_request.add_user_requests(); *req->mutable_request() = *batch_req[i]->user_request.get(); *req->mutable_signature() = batch_req[i]->context->signature; req->set_id(i); context_list.push_back(std::move(batch_req[i]->context)); } if (!config_.IsPerformanceRunning()) { LOG(ERROR) << "add context list:" << new_request->seq() << " list size:" << context_list.size() << " local_id:" << local_id_; batch_request.set_local_id(local_id_); int ret = AddContextList(std::move(context_list), local_id_++); if (ret != 0) { LOG(ERROR) << "add context list fail:"; return ret; } } batch_request.set_createtime(GetCurrentTime()); std::string data; batch_request.SerializeToString(&data); if (verifier_) { auto signature_or = verifier_->SignMessage(data); if (!signature_or.ok()) { LOG(ERROR) << "Sign message fail"; return -2; } *new_request->mutable_data_signature() = *signature_or; } batch_request.SerializeToString(new_request->mutable_data()); new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data())); new_request->set_proxy_id(config_.GetSelfInfo().id()); replica_communicator_->SendMessage(*new_request, GetPrimary()); send_num_++; // LOG(INFO) << "send msg to primary:" << GetPrimary() // << " batch size:" << batch_req.size(); AddWaitingResponseRequest(std::move(new_request)); return 0; } void ResponseManager::AddWaitingResponseRequest( std::unique_ptr<Request> request) { if (!config_.GetConfigData().enable_viewchange()) { return; } pm_lock_.lock(); assert(timeout_length_ > 0); uint64_t time = GetCurrentTime() + timeout_length_; client_timeout_min_heap_.push(ResponseClientTimeout(request->hash(), time)); waiting_response_batches_.insert( make_pair(request->hash(), std::move(request))); pm_lock_.unlock(); sem_post(&request_sent_signal_); } void ResponseManager::RemoveWaitingResponseRequest(const std::string& hash) { if (!config_.GetConfigData().enable_viewchange()) { return; } pm_lock_.lock(); if (waiting_response_batches_.find(hash) != waiting_response_batches_.end()) { waiting_response_batches_.erase(waiting_response_batches_.find(hash)); } pm_lock_.unlock(); } bool ResponseManager::CheckTimeOut(std::string hash) { pm_lock_.lock(); bool value = (waiting_response_batches_.find(hash) != waiting_response_batches_.end()); pm_lock_.unlock(); return value; } std::unique_ptr<Request> ResponseManager::GetTimeOutRequest(std::string hash) { pm_lock_.lock(); auto value = std::move(waiting_response_batches_.find(hash)->second); pm_lock_.unlock(); return value; } void ResponseManager::MonitoringClientTimeOut() { while (!stop_) { sem_wait(&request_sent_signal_); pm_lock_.lock(); if (client_timeout_min_heap_.empty()) { pm_lock_.unlock(); continue; } auto client_timeout = client_timeout_min_heap_.top(); client_timeout_min_heap_.pop(); pm_lock_.unlock(); if (client_timeout.timeout_time > GetCurrentTime()) { usleep(client_timeout.timeout_time - GetCurrentTime()); } if (CheckTimeOut(client_timeout.hash)) { auto request = GetTimeOutRequest(client_timeout.hash); if (request) { replica_communicator_->BroadCast(*request); } } } } } // namespace resdb