platform/consensus/ordering/pbft/performance_manager.cpp (340 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/consensus/ordering/pbft/performance_manager.h"
#include <glog/logging.h>
#include "common/utils/utils.h"
namespace resdb {
PerformanceClientTimeout::PerformanceClientTimeout(std::string hash_,
uint64_t time_) {
this->hash = hash_;
this->timeout_time = time_;
}
PerformanceClientTimeout::PerformanceClientTimeout(
const PerformanceClientTimeout& other) {
this->hash = other.hash;
this->timeout_time = other.timeout_time;
}
bool PerformanceClientTimeout::operator<(
const PerformanceClientTimeout& other) const {
return timeout_time > other.timeout_time;
}
PerformanceManager::PerformanceManager(
const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
SystemInfo* system_info, SignatureVerifier* verifier)
: config_(config),
replica_communicator_(replica_communicator),
collector_pool_(std::make_unique<LockFreeCollectorPool>(
"response", config_.GetMaxProcessTxn(), nullptr)),
context_pool_(std::make_unique<LockFreeCollectorPool>(
"context", config_.GetMaxProcessTxn(), nullptr)),
batch_queue_("user request"),
system_info_(system_info),
verifier_(verifier) {
stop_ = false;
eval_started_ = false;
eval_ready_future_ = eval_ready_promise_.get_future();
if (config_.GetPublicKeyCertificateInfo()
.public_key()
.public_key_info()
.type() == CertificateKeyInfo::CLIENT) {
for (int i = 0; i < 2; ++i) {
user_req_thread_[i] =
std::thread(&PerformanceManager::BatchProposeMsg, this);
}
}
checking_timeout_thread_ =
std::thread(&PerformanceManager::MonitoringClientTimeOut, this);
global_stats_ = Stats::GetGlobalStats();
for (size_t i = 0; i <= config_.GetReplicaNum(); i++) {
send_num_.push_back(0);
}
total_num_ = 0;
timeout_length_ = 100000000; // 10s
}
PerformanceManager::~PerformanceManager() {
stop_ = true;
for (int i = 0; i < 16; ++i) {
if (user_req_thread_[i].joinable()) {
user_req_thread_[i].join();
}
}
if (checking_timeout_thread_.joinable()) {
checking_timeout_thread_.join();
}
}
// use system info
int PerformanceManager::GetPrimary() { return system_info_->GetPrimaryId(); }
std::unique_ptr<Request> PerformanceManager::GenerateUserRequest() {
std::unique_ptr<Request> request = std::make_unique<Request>();
request->set_data(data_func_());
return request;
}
void PerformanceManager::SetDataFunc(std::function<std::string()> func) {
data_func_ = std::move(func);
}
int PerformanceManager::StartEval() {
if (eval_started_) {
return 0;
}
eval_started_ = true;
for (int i = 0; i < 60000000; ++i) {
std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
queue_item->context = nullptr;
queue_item->user_request = GenerateUserRequest();
batch_queue_.Push(std::move(queue_item));
if (i == 2000000) {
eval_ready_promise_.set_value(true);
}
}
LOG(WARNING) << "start eval done";
return 0;
}
// =================== response ========================
// handle the response message. If receive f+1 commit messages, send back to the
// user.
int PerformanceManager::ProcessResponseMsg(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
std::unique_ptr<Request> response;
std::string hash = request->hash();
int32_t primary_id = request->primary_id();
uint64_t seq = request->seq();
// Add the response message, and use the call back to collect the received
// messages.
// The callback will be triggered if it received f+1 messages.
if (request->ret() == -2) {
// LOG(INFO) << "get response fail:" << request->ret();
// send_num_--;
RemoveWaitingResponseRequest(hash);
return 0;
}
CollectorResultCode ret =
AddResponseMsg(context->signature, std::move(request),
[&](const Request& request,
const TransactionCollector::CollectorDataType*) {
response = std::make_unique<Request>(request);
return;
});
if (ret == CollectorResultCode::STATE_CHANGED) {
BatchUserResponse batch_response;
if (batch_response.ParseFromString(response->data())) {
if (seq > highest_seq_) {
highest_seq_ = seq;
if (highest_seq_primary_id_ != primary_id) {
system_info_->SetPrimary(primary_id);
highest_seq_primary_id_ = primary_id;
}
}
SendResponseToClient(batch_response);
RemoveWaitingResponseRequest(hash);
} else {
LOG(ERROR) << "parse response fail:";
}
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
}
bool PerformanceManager::MayConsensusChangeStatus(
int type, int received_count, std::atomic<TransactionStatue>* status) {
switch (type) {
case Request::TYPE_RESPONSE:
// if receive f+1 response results, ack to the caller.
if (*status == TransactionStatue::None &&
config_.GetMinClientReceiveNum() <= received_count) {
TransactionStatue old_status = TransactionStatue::None;
return status->compare_exchange_strong(
old_status, TransactionStatue::EXECUTED, std::memory_order_acq_rel,
std::memory_order_acq_rel);
}
break;
}
return false;
}
CollectorResultCode PerformanceManager::AddResponseMsg(
const SignatureInfo& signature, std::unique_ptr<Request> request,
std::function<void(const Request&,
const TransactionCollector::CollectorDataType*)>
response_call_back) {
if (request == nullptr) {
return CollectorResultCode::INVALID;
}
std::unique_ptr<BatchUserResponse> batch_response =
std::make_unique<BatchUserResponse>();
if (!batch_response->ParseFromString(request->data())) {
LOG(ERROR) << "parse response fail:" << request->data().size()
<< " seq:" << request->seq();
return CollectorResultCode::INVALID;
}
uint64_t seq = batch_response->local_id();
int type = request->type();
seq = request->seq();
int resp_received_count = 0;
int ret = collector_pool_->GetCollector(seq)->AddRequest(
std::move(request), signature, false,
[&](const Request& request, int received_count,
TransactionCollector::CollectorDataType* data,
std::atomic<TransactionStatue>* status, bool force) {
if (MayConsensusChangeStatus(type, received_count, status)) {
resp_received_count = 1;
response_call_back(request, data);
}
});
if (ret != 0) {
return CollectorResultCode::INVALID;
}
if (resp_received_count > 0) {
collector_pool_->Update(seq);
return CollectorResultCode::STATE_CHANGED;
}
return CollectorResultCode::OK;
}
void PerformanceManager::SendResponseToClient(
const BatchUserResponse& batch_response) {
uint64_t create_time = batch_response.createtime();
uint64_t local_id = batch_response.local_id();
if (create_time > 0) {
uint64_t run_time = GetCurrentTime() - create_time;
global_stats_->AddLatency(run_time);
} else {
LOG(ERROR) << "seq:" << local_id << " no resp";
}
{
// std::lock_guard<std::mutex> lk(mutex_);
if (send_num_[batch_response.primary_id()] > 0) {
send_num_[batch_response.primary_id()]--;
}
}
if (config_.IsPerformanceRunning()) {
return;
}
}
// =================== request ========================
int PerformanceManager::BatchProposeMsg() {
LOG(WARNING) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
<< " batch num:" << config_.ClientBatchNum()
<< " max txn:" << config_.GetMaxProcessTxn();
std::vector<std::unique_ptr<QueueItem>> batch_req;
eval_ready_future_.get();
while (!stop_) {
// std::lock_guard<std::mutex> lk(mutex_);
if (send_num_[GetPrimary()] >= config_.GetMaxProcessTxn()) {
usleep(100000);
continue;
}
if (batch_req.size() < config_.ClientBatchNum()) {
std::unique_ptr<QueueItem> item =
batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
if (item == nullptr) {
continue;
}
batch_req.push_back(std::move(item));
if (batch_req.size() < config_.ClientBatchNum()) {
continue;
}
}
int ret = DoBatch(batch_req);
batch_req.clear();
if (ret != 0) {
Response response;
response.set_result(Response::ERROR);
for (size_t i = 0; i < batch_req.size(); ++i) {
if (batch_req[i]->context && batch_req[i]->context->client) {
int ret = batch_req[i]->context->client->SendRawMessage(response);
if (ret) {
LOG(ERROR) << "send resp" << response.DebugString()
<< " fail ret:" << ret;
}
}
}
}
}
return 0;
}
int PerformanceManager::DoBatch(
const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
auto new_request =
NewRequest(Request::TYPE_NEW_TXNS, Request(), config_.GetSelfInfo().id());
if (new_request == nullptr) {
return -2;
}
std::vector<std::unique_ptr<Context>> context_list;
BatchUserRequest batch_request;
for (size_t i = 0; i < batch_req.size(); ++i) {
BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
*req->mutable_request() = *batch_req[i]->user_request.get();
if (batch_req[i]->context) {
*req->mutable_signature() = batch_req[i]->context->signature;
}
req->set_id(i);
}
batch_request.set_createtime(GetCurrentTime());
batch_request.set_local_id(local_id_++);
batch_request.SerializeToString(new_request->mutable_data());
if (verifier_) {
auto signature_or = verifier_->SignMessage(new_request->data());
if (!signature_or.ok()) {
LOG(ERROR) << "Sign message fail";
return -2;
}
*new_request->mutable_data_signature() = *signature_or;
}
new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
new_request->set_proxy_id(config_.GetSelfInfo().id());
replica_communicator_->SendMessage(*new_request, GetPrimary());
global_stats_->BroadCastMsg();
send_num_[GetPrimary()]++;
if (total_num_++ == 1000000) {
stop_ = true;
LOG(WARNING) << "total num is done:" << total_num_;
}
if (total_num_ % 10000 == 0) {
LOG(WARNING) << "total num is :" << total_num_;
}
global_stats_->IncClientCall();
AddWaitingResponseRequest(std::move(new_request));
return 0;
}
void PerformanceManager::AddWaitingResponseRequest(
std::unique_ptr<Request> request) {
if (!config_.GetConfigData().enable_viewchange()) {
return;
}
pm_lock_.lock();
uint64_t time = GetCurrentTime() + this->timeout_length_;
client_timeout_min_heap_.push(
PerformanceClientTimeout(request->hash(), time));
waiting_response_batches_.insert(
make_pair(request->hash(), std::move(request)));
pm_lock_.unlock();
sem_post(&request_sent_signal_);
}
void PerformanceManager::RemoveWaitingResponseRequest(std::string hash) {
if (!config_.GetConfigData().enable_viewchange()) {
return;
}
pm_lock_.lock();
if (waiting_response_batches_.find(hash) != waiting_response_batches_.end()) {
waiting_response_batches_.erase(waiting_response_batches_.find(hash));
}
pm_lock_.unlock();
}
bool PerformanceManager::CheckTimeOut(std::string hash) {
pm_lock_.lock();
bool value =
(waiting_response_batches_.find(hash) != waiting_response_batches_.end());
pm_lock_.unlock();
return value;
}
std::unique_ptr<Request> PerformanceManager::GetTimeOutRequest(
std::string hash) {
pm_lock_.lock();
auto value = std::move(waiting_response_batches_.find(hash)->second);
pm_lock_.unlock();
return value;
}
void PerformanceManager::MonitoringClientTimeOut() {
while (!stop_) {
sem_wait(&request_sent_signal_);
pm_lock_.lock();
if (client_timeout_min_heap_.empty()) {
pm_lock_.unlock();
continue;
}
auto client_timeout = client_timeout_min_heap_.top();
client_timeout_min_heap_.pop();
pm_lock_.unlock();
if (client_timeout.timeout_time > GetCurrentTime()) {
usleep(client_timeout.timeout_time - GetCurrentTime());
}
if (CheckTimeOut(client_timeout.hash)) {
auto request = GetTimeOutRequest(client_timeout.hash);
if (request) {
replica_communicator_->BroadCast(*request);
}
}
}
}
} // namespace resdb