platform/consensus/execution/transaction_executor.cpp (371 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/consensus/execution/transaction_executor.h"
#include <glog/logging.h>
#include "common/utils/utils.h"
namespace resdb {
TransactionExecutor::TransactionExecutor(
const ResDBConfig& config, PostExecuteFunc post_exec_func,
SystemInfo* system_info,
std::unique_ptr<TransactionManager> transaction_manager)
: config_(config),
post_exec_func_(post_exec_func),
system_info_(system_info),
transaction_manager_(std::move(transaction_manager)),
commit_queue_("order"),
execute_queue_("execute"),
stop_(false),
duplicate_manager_(nullptr) {
memset(blucket_, 0, sizeof(blucket_));
global_stats_ = Stats::GetGlobalStats();
ordering_thread_ = std::thread(&TransactionExecutor::OrderMessage, this);
for (int i = 0; i < execute_thread_num_; ++i) {
execute_thread_.push_back(
std::thread(&TransactionExecutor::ExecuteMessage, this));
}
for (int i = 0; i < 1; ++i) {
prepare_thread_.push_back(
std::thread(&TransactionExecutor::PrepareMessage, this));
}
if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
execute_OOO_thread_ =
std::thread(&TransactionExecutor::ExecuteMessageOutOfOrder, this);
LOG(ERROR) << " is out of order:" << transaction_manager_->IsOutOfOrder();
}
}
TransactionExecutor::~TransactionExecutor() { Stop(); }
void TransactionExecutor::RegisterExecute(int64_t seq) {
if (execute_thread_num_ == 1) return;
int idx = seq % blucket_num_;
std::unique_lock<std::mutex> lk(mutex_);
// LOG(ERROR)<<"register seq:"<<seq<<" bluck:"<<blucket_[idx];
assert(!blucket_[idx] || !(blucket_[idx] ^ 3));
blucket_[idx] = 1;
// LOG(ERROR)<<"register seq:"<<seq;
}
void TransactionExecutor::WaitForExecute(int64_t seq) {
if (execute_thread_num_ == 1) return;
int pre_idx = (seq - 1 + blucket_num_) % blucket_num_;
while (!IsStop()) {
std::unique_lock<std::mutex> lk(mutex_);
cv_.wait_for(lk, std::chrono::milliseconds(10000), [&] {
return ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]);
});
if ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]) {
break;
}
}
// LOG(ERROR)<<"wait for :"<<seq<<" done";
}
void TransactionExecutor::FinishExecute(int64_t seq) {
if (execute_thread_num_ == 1) return;
int idx = seq % blucket_num_;
std::unique_lock<std::mutex> lk(mutex_);
// LOG(ERROR)<<"finish :"<<seq<<" done";
blucket_[idx] = 3;
cv_.notify_all();
}
void TransactionExecutor::Stop() {
stop_ = true;
if (ordering_thread_.joinable()) {
ordering_thread_.join();
}
for (auto& th : execute_thread_) {
if (th.joinable()) {
th.join();
}
}
for (auto& th : prepare_thread_) {
if (th.joinable()) {
th.join();
}
}
if (execute_OOO_thread_.joinable()) {
execute_OOO_thread_.join();
}
}
Storage* TransactionExecutor::GetStorage() {
return transaction_manager_ ? transaction_manager_->GetStorage() : nullptr;
}
void TransactionExecutor::SetPreExecuteFunc(PreExecuteFunc pre_exec_func) {
pre_exec_func_ = pre_exec_func;
}
void TransactionExecutor::SetSeqUpdateNotifyFunc(SeqUpdateNotifyFunc func) {
seq_update_notify_func_ = func;
}
bool TransactionExecutor::IsStop() { return stop_; }
uint64_t TransactionExecutor::GetMaxPendingExecutedSeq() {
return next_execute_seq_ - 1;
}
bool TransactionExecutor::NeedResponse() {
return transaction_manager_ == nullptr ||
transaction_manager_->NeedResponse();
}
int TransactionExecutor::Commit(std::unique_ptr<Request> message) {
global_stats_->IncPendingExecute();
if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
// LOG(ERROR)<<"add out of order exe:"<<message->seq()<<" from
// proxy:"<<message->proxy_id();
std::unique_ptr<Request> msg = std::make_unique<Request>(*message);
execute_OOO_queue_.Push(std::move(message));
commit_queue_.Push(std::move(msg));
} else {
commit_queue_.Push(std::move(message));
}
return 0;
}
void TransactionExecutor::AddNewData(std::unique_ptr<Request> message) {
candidates_.insert(std::make_pair(message->seq(), std::move(message)));
}
std::unique_ptr<Request> TransactionExecutor::GetNextData() {
if (candidates_.empty() || candidates_.begin()->first != next_execute_seq_) {
return nullptr;
}
auto res = std::move(candidates_.begin()->second);
if (pre_exec_func_) {
pre_exec_func_(res.get());
}
candidates_.erase(candidates_.begin());
return res;
}
void TransactionExecutor::OrderMessage() {
while (!IsStop()) {
auto message = commit_queue_.Pop();
if (message != nullptr) {
global_stats_->IncExecute();
uint64_t seq = message->seq();
if (next_execute_seq_ > seq) {
// LOG(INFO) << "request seq:" << seq << " has been executed"
// << " next seq:" << next_execute_seq_;
continue;
}
AddNewData(std::move(message));
}
while (!IsStop()) {
std::unique_ptr<Request> message = GetNextData();
if (message == nullptr) {
break;
}
execute_queue_.Push(std::move(message));
next_execute_seq_++;
if (seq_update_notify_func_) {
seq_update_notify_func_(next_execute_seq_);
}
}
}
return;
}
void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
global_stats_->IncCommit();
message->set_commit_time(GetCurrentTime());
execute_queue_.Push(std::move(message));
}
void TransactionExecutor::ExecuteMessage() {
while (!IsStop()) {
auto message = execute_queue_.Pop();
if (message == nullptr) {
continue;
}
bool need_execute = true;
if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
need_execute = false;
}
Execute(std::move(message), need_execute);
}
}
void TransactionExecutor::ExecuteMessageOutOfOrder() {
while (!IsStop()) {
auto message = execute_OOO_queue_.Pop();
if (message == nullptr) {
continue;
}
OnlyExecute(std::move(message));
}
}
void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// Only Execute the request.
BatchUserRequest batch_request;
if (!batch_request.ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
}
batch_request.set_seq(request->seq());
batch_request.set_hash(request->hash());
batch_request.set_proxy_id(request->proxy_id());
if (request->has_committed_certs()) {
*batch_request.mutable_committed_certs() = request->committed_certs();
}
// LOG(INFO) << " get request batch size:"
// << batch_request.user_requests_size()<<" proxy
// id:"<<request->proxy_id();
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_) {
response = transaction_manager_->ExecuteBatch(batch_request);
}
// global_stats_->IncTotalRequest(batch_request.user_requests_size());
// global_stats_->IncExecuteDone();
}
void TransactionExecutor::Execute(std::unique_ptr<Request> request,
bool need_execute) {
RegisterExecute(request->seq());
std::unique_ptr<BatchUserRequest> batch_request = nullptr;
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>> data;
std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
BatchUserRequest* batch_request_p = nullptr;
// Execute the request, then send the response back to the user.
if (batch_request_p == nullptr) {
batch_request = std::make_unique<BatchUserRequest>();
if (!batch_request->ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
}
batch_request->set_hash(request->hash());
if (request->has_committed_certs()) {
*batch_request->mutable_committed_certs() = request->committed_certs();
}
batch_request->set_seq(request->seq());
batch_request->set_proxy_id(request->proxy_id());
batch_request_p = batch_request.get();
// LOG(ERROR)<<"get data from req:";
} else {
assert(batch_request_p);
batch_request_p->set_seq(request->seq());
batch_request_p->set_proxy_id(request->proxy_id());
// LOG(ERROR)<<" get from cache:"<<uid;
}
assert(batch_request_p);
// LOG(INFO) << " get request batch size:"
// << batch_request.user_requests_size()<<" proxy id:"
// <<request->proxy_id()<<" need execute:"<<need_execute;
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(*batch_request_p);
if (transaction_manager_ && need_execute) {
if (execute_thread_num_ == 1) {
response = transaction_manager_->ExecuteBatch(*batch_request_p);
} else {
std::vector<std::unique_ptr<std::string>> response_v;
if(data_p == nullptr) {
int64_t start_time = GetCurrentTime();
data = std::move(transaction_manager_->Prepare(*batch_request_p));
int64_t end_time = GetCurrentTime();
if (end_time - start_time > 10) {
// LOG(ERROR)<<"exec data done:"<<uid<<" wait
// time:"<<(end_time-start_time);
}
data_p = data.get();
}
WaitForExecute(request->seq());
if(data_p->empty() || (*data_p)[0] == nullptr){
response = transaction_manager_->ExecuteBatch(*batch_request_p);
}
else {
response_v = transaction_manager_->ExecuteBatchData(*data_p);
}
FinishExecute(request->seq());
if(response == nullptr){
response = std::make_unique<BatchUserResponse>();
for (auto& s : response_v) {
response->add_response()->swap(*s);
}
}
}
}
// LOG(ERROR)<<" CF = :"<<(cf==1)<<" uid:"<<uid;
if (duplicate_manager_ && batch_request_p) {
duplicate_manager_->AddExecuted(batch_request_p->hash(), batch_request_p->seq());
}
if (response == nullptr) {
response = std::make_unique<BatchUserResponse>();
}
global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
response->set_proxy_id(batch_request_p->proxy_id());
response->set_createtime(batch_request_p->createtime() + request->queuing_time());
response->set_local_id(batch_request_p->local_id());
response->set_seq(request->seq());
if (post_exec_func_) {
post_exec_func_(std::move(request), std::move(response));
}
global_stats_->IncExecuteDone();
}
void TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
duplicate_manager_ = manager;
}
bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
auto it = flag_[uid % mod].find(uid);
if (it == flag_[uid % mod].end()) {
flag_[uid % mod][uid] |= f;
// LOG(ERROR)<<"NO FUTURE uid:"<<uid;
return true;
}
assert(it != flag_[uid % mod].end());
if (f == Start_Prepare) {
if (flag_[uid % mod][uid] & Start_Execute) {
return false;
}
} else if(f == Start_Execute){
if (flag_[uid % mod][uid] & End_Prepare) {
//if (flag_[uid % mod][uid] & Start_Prepare) {
return false;
}
}
flag_[uid % mod][uid] |= f;
return true;
}
void TransactionExecutor::ClearPromise(uint64_t uid) {
std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
auto it = pre_[uid % mod].find(uid);
if (it == pre_[uid % mod].end()) {
return;
}
// LOG(ERROR)<<"CLEAR UID:"<<uid;
assert(it != pre_[uid % mod].end());
assert(flag_[uid % mod].find(uid) != flag_[uid % mod].end());
//assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
//assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
//data_[uid%mod].erase(data_[uid%mod].find(uid));
//req_[uid%mod].erase(req_[uid%mod].find(uid));
pre_[uid % mod].erase(it);
flag_[uid % mod].erase(flag_[uid % mod].find(uid));
}
std::promise<int>* TransactionExecutor::GetPromise(uint64_t uid) {
std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
auto it = pre_[uid % mod].find(uid);
if (it == pre_[uid % mod].end()) {
return nullptr;
}
return it->second.get();
}
std::unique_ptr<std::future<int>> TransactionExecutor::GetFuture(uint64_t uid) {
std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
auto it = pre_[uid % mod].find(uid);
if (it == pre_[uid % mod].end()) {
return nullptr;
}
//return std::move(it->second);
// LOG(ERROR)<<"add future:"<<uid;
return std::make_unique<std::future<int>>(it->second->get_future());
}
bool TransactionExecutor::AddFuture(uint64_t uid) {
std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
auto it = pre_[uid % mod].find(uid);
if (it == pre_[uid % mod].end()) {
// LOG(ERROR)<<"add future:"<<uid;
std::unique_ptr<std::promise<int>> p =
std::make_unique<std::promise<int>>();
//auto f = std::make_unique<std::future<int>>(p->get_future());
pre_[uid % mod][uid] = std::move(p);
//pre_f_[uid % mod][uid] = std::move(f);
flag_[uid % mod][uid] = 0;
return true;
}
return false;
}
void TransactionExecutor::Prepare(std::unique_ptr<Request> request) {
if (AddFuture(request->uid())) {
prepare_queue_.Push(std::move(request));
}
}
void TransactionExecutor::PrepareMessage() {
while (!IsStop()) {
std::unique_ptr<Request> request = prepare_queue_.Pop();
if (request == nullptr) {
continue;
}
uint64_t uid = request->uid();
int current_f = SetFlag(uid, Start_Prepare);
if (current_f == 0) {
// commit has done
// LOG(ERROR)<<" want prepare, commit started:"<<uid;
// ClearPromise(uid);
continue;
}
std::promise<int>* p = GetPromise(uid) ;
assert(p);
//LOG(ERROR)<<" prepare started:"<<uid;
// LOG(ERROR)<<" prepare uid:"<<uid;
// Execute the request, then send the response back to the user.
std::unique_ptr<BatchUserRequest> batch_request =
std::make_unique<BatchUserRequest>();
if (!batch_request->ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
}
// batch_request = std::make_unique<BatchUserRequest>();
batch_request->set_seq(request->seq());
batch_request->set_hash(request->hash());
batch_request->set_proxy_id(request->proxy_id());
if (request->has_committed_certs()) {
*batch_request->mutable_committed_certs() = request->committed_certs();
}
// LOG(ERROR)<<"prepare seq:"<<batch_request->seq()<<" proxy
// id:"<<request->proxy_id()<<" local id:"<<batch_request->local_id();
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
request_v = transaction_manager_->Prepare(*batch_request);
{
std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
// assert(request_v);
// assert(data_[uid%mod].find(uid) == data_[uid%mod].end());
data_[uid%mod][uid] = std::move(request_v);
req_[uid % mod][uid] = std::move(batch_request);
}
//LOG(ERROR)<<"set promise:"<<uid;
p->set_value(1);
{
int set_ret = SetFlag(uid, End_Prepare);
if (set_ret == 0) {
// LOG(ERROR)<<"commit interrupt:"<<uid;
//ClearPromise(uid);
} else {
//LOG(ERROR)<<"prepare done:"<<uid;
}
}
}
}
} // namespace resdb